diff --git a/seed/vaultwarden/dictionaries/40_vaultwarden.xml b/seed/vaultwarden/dictionaries/40_vaultwarden.xml
index aeab9bcf..71ea3d0f 100644
--- a/seed/vaultwarden/dictionaries/40_vaultwarden.xml
+++ b/seed/vaultwarden/dictionaries/40_vaultwarden.xml
@@ -6,6 +6,7 @@
/etc/pki/ca-trust/source/anchors/ca_InternalReverseProxy.crt
/tmpfiles.d/0vaultwarden.conf
/etc/vaultwarden/config.env
+ /tests/vaultwarden.yml
@@ -27,13 +28,13 @@
-
20
Vaultwarden
+
@@ -50,8 +51,9 @@
vaultwarden_admin_password
hide_secret
-
- vaultwarden_device_identifier
+
+ domain_name_eth0
+ vaultwarden_test_device_identifier
True
diff --git a/seed/vaultwarden/funcs/risotto_setting.py b/seed/vaultwarden/funcs/risotto_setting.py
deleted file mode 100644
index a400771c..00000000
--- a/seed/vaultwarden/funcs/risotto_setting.py
+++ /dev/null
@@ -1,6 +0,0 @@
-from uuid import uuid4 as _uuid4
-
-
-def gen_uuid():
- return str(_uuid4())
-
diff --git a/seed/vaultwarden/funcs/vaultwarden.py b/seed/vaultwarden/funcs/vaultwarden.py
index a400771c..2b711125 100644
--- a/seed/vaultwarden/funcs/vaultwarden.py
+++ b/seed/vaultwarden/funcs/vaultwarden.py
@@ -1,6 +1,22 @@
+import __main__
+from os.path import dirname as _dirname, abspath as _abspath, join as _join, isfile as _isfile, isdir as _isdir
+from os import makedirs as _makedirs
from uuid import uuid4 as _uuid4
-def gen_uuid():
- return str(_uuid4())
+_HERE = _dirname(_abspath(__main__.__file__))
+_PASSWORD_DIR = _join(_HERE, 'password')
+
+def get_uuid(server_name: str) -> str:
+ dir_name = _join(_PASSWORD_DIR, server_name)
+ if not _isdir(dir_name):
+ _makedirs(dir_name)
+ file_name = _join(dir_name, 'uuid')
+ if not _isfile(file_name):
+ uuid = str(_uuid4())
+ with open(file_name, 'w') as fh:
+ fh.write(uuid)
+ with open(file_name, 'r') as fh:
+ file_content = fh.read().strip()
+ return file_content
diff --git a/seed/vaultwarden/templates/vaultwarden.yml b/seed/vaultwarden/templates/vaultwarden.yml
new file mode 100644
index 00000000..2a901d4e
--- /dev/null
+++ b/seed/vaultwarden/templates/vaultwarden.yml
@@ -0,0 +1,7 @@
+url: https://%%revprox_client_external_domainname%%{revprox_client_location[0]}
+%set %%username='rougail_test@silique.fr'
+username: %%username
+password: %%get_password(server_name=%%domain_name_eth0, username=%%username, description='test', type="cleartext", hide=%%hide_secret, temporary=False)
+privkey: %%srv_dir/vaultwarden/rsa_key.pem
+uuid: %%vaultwarden_test_device_identifier
+revprox_ip: %%revprox_client_server_ip
diff --git a/seed/vaultwarden/tests/test_vaultwarden.py b/seed/vaultwarden/tests/test_vaultwarden.py
new file mode 100644
index 00000000..22cd1a37
--- /dev/null
+++ b/seed/vaultwarden/tests/test_vaultwarden.py
@@ -0,0 +1,40 @@
+from yaml import load, SafeLoader
+from os import environ
+from mookdns import MookDns
+
+from vaultwarden import VaultWarden
+
+
+def test_vaultwarden_login():
+ conf_file = f'{environ["MACHINE_TEST_DIR"]}/vaultwarden.yml'
+ with open(conf_file) as yaml:
+ data = load(yaml, Loader=SafeLoader)
+ with MookDns(data['revprox_ip']):
+ vaultwarden = VaultWarden(data['url'], data['username'], data['uuid'], data['privkey'])
+ if 'FIRST_RUN' in environ:
+ vaultwarden.register(data['password'])
+ vaultwarden.login(data['password'])
+ vaultwarden.load_organizations()
+
+
+def test_vaultwarden_collection():
+ conf_file = f'{environ["MACHINE_TEST_DIR"]}/vaultwarden.yml'
+ with open(conf_file) as yaml:
+ data = load(yaml, Loader=SafeLoader)
+ with MookDns(data['revprox_ip']):
+ vaultwarden = VaultWarden(data['url'], data['username'], data['uuid'], data['privkey'])
+ vaultwarden.login(data['password'])
+ vaultwarden.load_organizations()
+ if 'FIRST_RUN' in environ:
+ organization_id = vaultwarden.create_organization(data['username'],
+ 'test_organization',
+ )
+ vaultwarden.create_collection(organization_id,
+ 'test_collection',
+ )
+ assert len(vaultwarden.vaultwarden_organizations) == 2
+ for org in vaultwarden.vaultwarden_organizations:
+ if org is not None:
+ assert vaultwarden.vaultwarden_organizations[org]['name'] == 'test_organization'
+ assert len(vaultwarden.vaultwarden_organizations[org]['collections']) == 2
+ assert set(vaultwarden.vaultwarden_organizations[org]['collections']) == {'test_organization', 'test_collection'}
diff --git a/seed/vaultwarden/tests/vaultwarden.py b/seed/vaultwarden/tests/vaultwarden.py
new file mode 100644
index 00000000..160e3774
--- /dev/null
+++ b/seed/vaultwarden/tests/vaultwarden.py
@@ -0,0 +1,531 @@
+from typing import Union, Tuple, Optional
+#python3-crypto
+from Cryptodome.PublicKey import RSA
+from Cryptodome.Cipher import AES, PKCS1_OAEP
+from hmac import new as hmac_new
+from secrets import token_bytes
+
+from time import time
+from json import dumps
+from hashlib import pbkdf2_hmac, sha256
+#from aiohttp import ClientSession
+from requests import session
+from base64 import b64encode, b64decode
+from hkdf import hkdf_expand
+from collections import namedtuple
+from os.path import isfile
+from jwt import encode as jwt_encode, decode as jwt_decode
+
+
+
+#BITWARDEN_PRIVATE_KEY = '/var/lib/vaultwarden_rs/rsa_key.der'
+
+cipher_string_fields = {
+ 'enc_type': lambda enc_type,iv,mac,ct: int(enc_type),
+ 'iv': lambda enc_type,iv,mac,ct: iv,
+ 'mac': lambda enc_type,iv,mac,ct: mac,
+ 'ct': lambda enc_type,iv,mac,ct: ct,
+}
+CipherString = namedtuple('CipherString', cipher_string_fields.keys())
+
+
+# support pulling apart a VaultWarden 'CipherString' from the following
+# - cipher string: ".||"
+def cipher_string_from_str(cipher_string: str) -> CipherString:
+ enc_type, data = cipher_string.split('.', 1)
+ if enc_type == '2':
+ iv, ct, mac = (b64decode(sdata) for sdata in data.split('|', 2))
+
+ d = { k: fn(enc_type, iv, mac, ct) for k,fn in cipher_string_fields.items() }
+ else:
+ iv, mac = None, None
+ ct = b64decode(data)
+ d = { k: fn(enc_type, iv, mac, ct) for k, fn in cipher_string_fields.items() }
+ return CipherString(**d)
+
+
+class VaultWarden:
+ def __init__(self,
+ url: str,
+ email: str,
+ uuid: str,
+ vaultwarden_key: str,
+ ) -> None:
+ self.vaultwarden_url = url
+ self.vaultwarden_email = email.lower()
+ self.vaultwarden_uuid = uuid
+ self.vaultwarden_login = None
+ self.vaultwarden_organizations = None
+ self.vaultwarden_key = vaultwarden_key
+
+ def register(self,
+ password: str,
+ valid: bool=True,
+ ) -> None:
+ iterations = self.get_iterations()
+ master_key, hash_password = self.hash_password(password,
+ iterations,
+ )
+ # generate symmetric key
+ token = token_bytes(64)
+ enc, mac = self._get_enc_mac(master_key)
+ key = self.encrypt_symmetric(token,
+ enc=enc,
+ mac=mac,
+ )
+ # generate asymmetric key
+ asym_key = RSA.generate(2048)
+ enc_private_key = self.encrypt_symmetric(asym_key.exportKey('DER', pkcs=8),
+ enc=token[:32],
+ mac=token[32:],
+ )
+ public_key = b64encode(asym_key.publickey().exportKey('DER')).decode()
+ data = {'name': self.vaultwarden_email.split('@')[0],
+ 'email': self.vaultwarden_email,
+ 'masterPasswordHash': hash_password,
+ 'masterPasswordHint': None,
+ 'key': key,
+ 'kdf': 0,
+ 'kdfIterations': iterations,
+ 'referenceId': None,
+ 'keys': {
+ 'publicKey': public_key,
+ 'encryptedPrivateKey': enc_private_key
+ }
+ }
+ register = self._post('api/accounts/register',
+ dumps(data),
+ )
+ if 'Object' in register and register['Object'] == 'error':
+ if register["ErrorModel"]['Message'] == 'User already exists':
+ return
+ raise Exception(register["ErrorModel"]["Message"])
+ if valid and isfile(self.vaultwarden_key):
+ self.login(password)
+ # values = self.get('/api/sync')
+ # user_id = values['Profile']['Id']
+ user_id = jwt_decode(self.vaultwarden_login['access_token'],
+ algorithm="RS256",
+ #pyjwt 1
+ verify=False,
+ #pyjwt 2
+ options={"verify_signature": False},
+ )['sub']
+ now = int(time())
+ url = self.vaultwarden_url
+ if url[-1] == '/':
+ url = url[:-1]
+ data = {'nbf': now,
+ 'exp': now + 432000,
+ 'iss': f'{url}|verifyemail',
+ 'sub': user_id,
+ }
+ with open(self.vaultwarden_key, 'rb') as private_key_fh:
+ private_key = RSA.importKey(private_key_fh.read()).exportKey('PEM')
+ token = jwt_encode(data, private_key, algorithm="RS256")
+ if isinstance(token, bytes):
+ tocken = token.decode()
+ data = {'userId': user_id,
+ 'token': token,
+ }
+ self._post('api/accounts/verify-email-token', dumps(data))
+
+ def login(self,
+ password: str,
+ ) -> None:
+ iterations = self.get_iterations()
+ master_key, hash_password = self.hash_password(password,
+ iterations,
+ )
+ data = {'grant_type': 'password',
+ 'username': self.vaultwarden_email,
+ 'password': hash_password,
+ 'scope': 'api offline_access',
+ 'client_id': 'desktop',
+ 'device_type': 7,
+ 'device_identifier': self.vaultwarden_uuid,
+ 'device_name': 'risotto',
+ }
+ vaultwarden_login = self._post('identity/connect/token', data)
+ if 'Object' in vaultwarden_login and vaultwarden_login['Object'] == 'error':
+ raise Exception(f'unable to log to VaultWarden: {vaultwarden_login["ErrorModel"]["Message"]}')
+ self.vaultwarden_login = vaultwarden_login
+ self.vaultwarden_login['master_key'] = master_key
+ self.vaultwarden_login['hash_password'] = hash_password
+
+ def get_iterations(self):
+ data = self._post('api/accounts/prelogin', dumps({'email': self.vaultwarden_email}))
+ return data['KdfIterations']
+
+ def hash_password(self,
+ password: str,
+ iterations: int,
+ ) -> str:
+ master_key = pbkdf2_hmac('sha256',
+ password.encode(),
+ self.vaultwarden_email.encode(),
+ iterations,
+ )
+ passwd = pbkdf2_hmac('sha256',
+ master_key,
+ password.encode(),
+ 1,
+ )
+ return master_key, b64encode(passwd).decode()
+
+ def decrypt(self,
+ cipher_string: str,
+ organization_id: str=None,
+ ) -> None:
+ cipher = cipher_string_from_str(cipher_string)
+ if cipher.enc_type == 2:
+ return self.decrypt_symmetric(cipher,
+ organization_id,
+ )
+ elif cipher.enc_type == 4:
+ if organization_id:
+ raise Exception('cipher type {cipher.enc_type} cannot have organization_id')
+ return self.decrypt_asymmetric(cipher)
+ raise Exception(f'Unknown cipher type {cipher.enc_type}')
+
+ def decrypt_symmetric(self,
+ cipher: str,
+ organization_id: str=None,
+ enc: str=None,
+ mac: str=None,
+ ) -> bytes:
+ # i.e: AesCbc256_HmacSha256_B64 (jslib/src/enums/encryptionType.ts)
+ assert cipher.enc_type == 2
+ if enc is None:
+ enc = self.vaultwarden_organizations[organization_id]['key'][:32]
+ mac = self.vaultwarden_organizations[organization_id]['key'][32:]
+ # verify the MAC
+ cmac = hmac_new(mac,
+ cipher.iv + cipher.ct,
+ sha256,
+ )
+ assert cipher.mac == cmac.digest()
+
+ # decrypt the content
+ c = AES.new(enc,
+ AES.MODE_CBC,
+ cipher.iv,
+ )
+ plaintext = c.decrypt(cipher.ct)
+
+ # remove PKCS#7 padding from payload, see RFC 5652
+ # https://tools.ietf.org/html/rfc5652#section-6.3
+ pad_len = plaintext[-1]
+ padding = bytes([pad_len] * pad_len)
+ if plaintext[-pad_len:] == padding:
+ plaintext = plaintext[:-pad_len]
+ return plaintext
+
+ def decrypt_asymmetric(self,
+ cipher: str,
+ ) -> str:
+ private_key = self.decrypt(self.vaultwarden_login['PrivateKey'])
+ c = PKCS1_OAEP.new(RSA.importKey(private_key))
+ return c.decrypt(cipher.ct)
+
+ def encrypt_symmetric(self,
+ content: bytes,
+ organization_id: str=None,
+ enc: str=None,
+ mac: str=None,
+ ) -> None:
+ iv = token_bytes(16)
+ if enc is None:
+ enc = self.vaultwarden_organizations[organization_id]['key'][:32]
+ mac = self.vaultwarden_organizations[organization_id]['key'][32:]
+ c = AES.new(enc,
+ AES.MODE_CBC,
+ iv,
+ )
+ pad_len = 16 - len(content) % 16
+ padding = bytes([ pad_len ] * pad_len)
+ ct = c.encrypt(content + padding)
+ cmac = hmac_new(mac,
+ iv + ct,
+ sha256,
+ )
+ return f"2.{b64encode(iv).decode()}|{b64encode(ct).decode()}|{b64encode(cmac.digest()).decode()}"
+
+ def encrypt_asymmetric(self,
+ plaintext: str,
+ key: str,
+ ) -> str:
+ rsa_key = RSA.importKey(key)
+ cipher = PKCS1_OAEP.new(rsa_key).encrypt(plaintext)
+ b64_cipher = b64encode(cipher).decode()
+ return f"4.{b64_cipher}"
+
+ def get(self,
+ url: str,
+ ) -> None:
+ with session() as req:
+ resp = req.get(self.vaultwarden_url + url, headers=self._get_headers())
+ assert resp.status_code == 200
+ try:
+ response = resp.json()
+ except:
+ response = resp.text
+ return response
+
+ def _post(self,
+ url: str,
+ data: dict,
+ ) -> None:
+ with session() as req:
+ resp = req.post(self.vaultwarden_url + url,
+ data=data,
+ headers=self._get_headers(),
+ )
+ assert resp.status_code == 200, f'unable to post to url {self.vaultwarden_url}{url} with data {data}: {resp.text}'
+ try:
+ response = resp.json()
+ except:
+ response = resp.text
+ return response
+
+ def _put(self,
+ url: str,
+ data: dict,
+ ) -> None:
+ with session() as req:
+ resp = req.put(self.vaultwarden_url + url,
+ data=data,
+ headers=self._get_headers(),
+ )
+ try:
+ response = resp.json()
+ except:
+ response = resp.text
+ return response
+
+ def _get_headers(self,
+ ) -> None:
+ if self.vaultwarden_login == None:
+ return None
+ return {'Authorization': f'Bearer {self.vaultwarden_login["access_token"]}'}
+
+ def load_organizations(self,
+ only_default: bool=False,
+ ) -> None:
+ values = self.get('/api/sync')
+ enc, mac = self._get_enc_mac(self.vaultwarden_login['master_key'])
+ # 'decrypt' the user_key to produce the actual keys
+ cipher = cipher_string_from_str(self.vaultwarden_login['Key'])
+ plaintext_userkey = self.decrypt_symmetric(cipher,
+ enc=enc,
+ mac=mac,
+ )
+ assert len(plaintext_userkey) == 64
+ self.vaultwarden_organizations = {None: {'key': plaintext_userkey, 'name': 'default', 'collections': {}}}
+ if not only_default:
+ for organization in values['Profile']['Organizations']:
+ plaintext = self.decrypt(organization['Key'])
+ self._add_organization(plaintext,
+ organization,
+ )
+ for collection in values['Collections']:
+ name = self.decrypt(collection['Name'],
+ collection['OrganizationId'],
+ ).decode()
+ self.vaultwarden_organizations[collection['OrganizationId']]['collections'][name] = collection['Id']
+
+ def _get_enc_mac(self,
+ master_key: str,
+ ) -> tuple:
+ enc = hkdf_expand(master_key,
+ b'enc',
+ 32,
+ sha256,
+ )
+ mac = hkdf_expand(master_key,
+ b'mac',
+ 32,
+ sha256,
+ )
+ return enc, mac
+
+ def _add_organization(self,
+ plaintext: bytes,
+ organization: dict,
+ ) -> None:
+ organization_id = organization['Id']
+ self.vaultwarden_organizations[organization_id] = {'name': organization['Name'], 'key': plaintext, 'collections': {}}
+
+ def try_to_confirm(self,
+ organization_id,
+ email,
+ ) -> bool:
+ # user is now in organization
+ user = self.get_user_informations(organization_id,
+ email,
+ )
+
+ # if account exists now, confirm it
+ if user['public_key']:
+ key = self.encrypt_asymmetric(self.vaultwarden_organizations[organization_id]['key'],
+ user['public_key'],
+ )
+ data = {"key": key}
+ confirmed = self._post(f'api/organizations/{organization_id}/users/{user["user_id"]}/confirm',
+ dumps(data),
+ )
+ return user['user_id'], 'Object' not in confirmed or confirmed['Object'] != 'error'
+ return user['user_id'], False
+
+ def get_user_informations(self,
+ organization_id: str,
+ email: str,
+ ) -> None:
+ users = self.get(f'/api/organizations/{organization_id}/users')
+ for user in users['Data']:
+ if user['Email'] == email:
+ user_public_key = self.get(f'/api/users/{user["UserId"]}/public-key')
+ if not user_public_key['PublicKey']:
+ public_key = None
+ else:
+ public_key = b64decode(user_public_key['PublicKey'])
+ return {'user_id': user['Id'],
+ 'public_key': public_key,
+ }
+ raise Exception(f'unknow email {email} in organization id {organization_id}')
+
+ def create_organization(self,
+ email: str,
+ organization_name: str,
+ ) -> None:
+ private_key = self.decrypt(self.vaultwarden_login['PrivateKey'])
+ token = token_bytes(64)
+ key = self.encrypt_asymmetric(token,
+ private_key,
+ )
+ # defaut collection_name is organization_name
+ data = {
+ "key": key,
+ "collectionName": self.encrypt_symmetric(organization_name.encode(),
+ enc=token[:32],
+ mac=token[32:],
+ ),
+ "name": organization_name,
+ "billingEmail": email,
+ "planType": 0,
+ }
+ organization = self._post('api/organizations',
+ dumps(data),
+ )
+ self.load_organizations()
+ #self._add_organization(token,
+ # organization,
+ # )
+ return organization['Id']
+
+ def invite(self,
+ organization_id: str,
+ email: str,
+ ) -> bool:
+ data = {'emails': [email],
+ 'collections': [],
+ 'accessAll': False,
+ 'type': 2,
+ }
+ for collection_id in self.vaultwarden_organizations[organization_id]['collections'].values():
+ data['collections'].append({'id': collection_id,
+ 'readOnly': True,
+ 'hidePasswords': False,
+ })
+ self._post(f'api/organizations/{organization_id}/users/invite',
+ dumps(data),
+ )
+
+ def create_collection(self,
+ organization_id: str,
+ collection_name: str,
+ user_id: str=None,
+ ) -> None:
+ data = {"groups": [],
+ "name": self.encrypt_symmetric(collection_name.encode(),
+ organization_id,
+ ),
+ }
+ collection = self._post(f'api/organizations/{organization_id}/collections',
+ dumps(data),
+ )
+ self.vaultwarden_organizations[organization_id]['collections'][collection_name] = collection['Id']
+ if user_id:
+ self.inscript_collection(organization_id,
+ collection['Id'],
+ user_id,
+ )
+ return collection['Id']
+
+ def inscript_collection(self,
+ organization_id: str,
+ collection_id: str,
+ user_id: str,
+ ) -> None:
+ data = [{'id': user_id,
+ 'readOnly': True,
+ 'hidePasswords': False,
+ }]
+ self._put(f'api/organizations/{organization_id}/collections/{collection_id}/users',
+ dumps(data),
+ )
+
+ def store_password(self,
+ organization_id: str,
+ collection_id: str,
+ name: str,
+ username: str,
+ password: str,
+ uris: list=None,
+ ) -> None:
+ """create a cipher et store it in a share collection
+ """
+ # FIXME uris are encoded
+ data = {"cipher": {
+ "type": 1,
+ "folderId": None,
+ "organizationId": organization_id,
+ "name": self.encrypt_symmetric(name.encode(),
+ organization_id,
+ ),
+ "notes": None,
+ "favorite": False,
+ "login":
+ {"response": None,
+ "uris": uris,
+ "username": self.encrypt_symmetric(username.encode(),
+ organization_id,
+ ),
+ "password": self.encrypt_symmetric(password.encode(),
+ organization_id,
+ ),
+ "passwordRevisionDate": None,
+ "totp": None,
+ }
+ },
+ "collectionIds": [collection_id],
+ }
+ self._post('api/ciphers/admin',
+ dumps(data),
+ )
+
+ def get_password(self,
+ organization_id: str,
+ collection_name: str,
+ name: str,
+ username: str,
+ ) -> list:
+ if not collection_name in self.vaultwarden_organizations[organization_id]['collections']:
+ return
+ collection_id = self.vaultwarden_organizations[organization_id]['collections'][collection_name]
+ ciphers = self.get(f'api/ciphers/organization-details?organizationId={organization_id}')
+ for cipher in ciphers['Data']:
+ if collection_id in cipher['CollectionIds'] and \
+ self.decrypt(cipher['Data']['Name'], organization_id).decode() == name and \
+ self.decrypt(cipher['Data']['Username'], organization_id).decode() == username:
+ return self.decrypt(cipher['Data']['Password'], organization_id).decode()