dataset/seed/vaultwarden/tests/vaultwarden.py

532 lines
22 KiB
Python
Raw Permalink Normal View History

2022-07-17 23:00:21 +02:00
from typing import Union, Tuple, Optional
#python3-crypto
from Cryptodome.PublicKey import RSA
from Cryptodome.Cipher import AES, PKCS1_OAEP
from hmac import new as hmac_new
from secrets import token_bytes
from time import time
from json import dumps
from hashlib import pbkdf2_hmac, sha256
#from aiohttp import ClientSession
from requests import session
from base64 import b64encode, b64decode
from hkdf import hkdf_expand
from collections import namedtuple
from os.path import isfile
from jwt import encode as jwt_encode, decode as jwt_decode
#BITWARDEN_PRIVATE_KEY = '/var/lib/vaultwarden_rs/rsa_key.der'
cipher_string_fields = {
'enc_type': lambda enc_type,iv,mac,ct: int(enc_type),
'iv': lambda enc_type,iv,mac,ct: iv,
'mac': lambda enc_type,iv,mac,ct: mac,
'ct': lambda enc_type,iv,mac,ct: ct,
}
CipherString = namedtuple('CipherString', cipher_string_fields.keys())
# support pulling apart a VaultWarden 'CipherString' from the following
# - cipher string: "<enc_type>.<iv>|<ct>|<mac>"
def cipher_string_from_str(cipher_string: str) -> CipherString:
enc_type, data = cipher_string.split('.', 1)
if enc_type == '2':
iv, ct, mac = (b64decode(sdata) for sdata in data.split('|', 2))
d = { k: fn(enc_type, iv, mac, ct) for k,fn in cipher_string_fields.items() }
else:
iv, mac = None, None
ct = b64decode(data)
d = { k: fn(enc_type, iv, mac, ct) for k, fn in cipher_string_fields.items() }
return CipherString(**d)
class VaultWarden:
def __init__(self,
url: str,
email: str,
uuid: str,
vaultwarden_key: str,
) -> None:
self.vaultwarden_url = url
self.vaultwarden_email = email.lower()
self.vaultwarden_uuid = uuid
self.vaultwarden_login = None
self.vaultwarden_organizations = None
self.vaultwarden_key = vaultwarden_key
def register(self,
password: str,
valid: bool=True,
) -> None:
iterations = self.get_iterations()
master_key, hash_password = self.hash_password(password,
iterations,
)
# generate symmetric key
token = token_bytes(64)
enc, mac = self._get_enc_mac(master_key)
key = self.encrypt_symmetric(token,
enc=enc,
mac=mac,
)
# generate asymmetric key
asym_key = RSA.generate(2048)
enc_private_key = self.encrypt_symmetric(asym_key.exportKey('DER', pkcs=8),
enc=token[:32],
mac=token[32:],
)
public_key = b64encode(asym_key.publickey().exportKey('DER')).decode()
data = {'name': self.vaultwarden_email.split('@')[0],
'email': self.vaultwarden_email,
'masterPasswordHash': hash_password,
'masterPasswordHint': None,
'key': key,
'kdf': 0,
'kdfIterations': iterations,
'referenceId': None,
'keys': {
'publicKey': public_key,
'encryptedPrivateKey': enc_private_key
}
}
register = self._post('api/accounts/register',
dumps(data),
)
if 'Object' in register and register['Object'] == 'error':
if register["ErrorModel"]['Message'] == 'User already exists':
return
raise Exception(register["ErrorModel"]["Message"])
if valid and isfile(self.vaultwarden_key):
self.login(password)
# values = self.get('/api/sync')
# user_id = values['Profile']['Id']
user_id = jwt_decode(self.vaultwarden_login['access_token'],
algorithm="RS256",
#pyjwt 1
verify=False,
#pyjwt 2
options={"verify_signature": False},
)['sub']
now = int(time())
url = self.vaultwarden_url
if url[-1] == '/':
url = url[:-1]
data = {'nbf': now,
'exp': now + 432000,
'iss': f'{url}|verifyemail',
'sub': user_id,
}
with open(self.vaultwarden_key, 'rb') as private_key_fh:
private_key = RSA.importKey(private_key_fh.read()).exportKey('PEM')
token = jwt_encode(data, private_key, algorithm="RS256")
if isinstance(token, bytes):
tocken = token.decode()
data = {'userId': user_id,
'token': token,
}
self._post('api/accounts/verify-email-token', dumps(data))
def login(self,
password: str,
) -> None:
iterations = self.get_iterations()
master_key, hash_password = self.hash_password(password,
iterations,
)
data = {'grant_type': 'password',
'username': self.vaultwarden_email,
'password': hash_password,
'scope': 'api offline_access',
'client_id': 'desktop',
'device_type': 7,
'device_identifier': self.vaultwarden_uuid,
'device_name': 'risotto',
}
vaultwarden_login = self._post('identity/connect/token', data)
if 'Object' in vaultwarden_login and vaultwarden_login['Object'] == 'error':
raise Exception(f'unable to log to VaultWarden: {vaultwarden_login["ErrorModel"]["Message"]}')
self.vaultwarden_login = vaultwarden_login
self.vaultwarden_login['master_key'] = master_key
self.vaultwarden_login['hash_password'] = hash_password
def get_iterations(self):
data = self._post('api/accounts/prelogin', dumps({'email': self.vaultwarden_email}))
return data['KdfIterations']
def hash_password(self,
password: str,
iterations: int,
) -> str:
master_key = pbkdf2_hmac('sha256',
password.encode(),
self.vaultwarden_email.encode(),
iterations,
)
passwd = pbkdf2_hmac('sha256',
master_key,
password.encode(),
1,
)
return master_key, b64encode(passwd).decode()
def decrypt(self,
cipher_string: str,
organization_id: str=None,
) -> None:
cipher = cipher_string_from_str(cipher_string)
if cipher.enc_type == 2:
return self.decrypt_symmetric(cipher,
organization_id,
)
elif cipher.enc_type == 4:
if organization_id:
raise Exception('cipher type {cipher.enc_type} cannot have organization_id')
return self.decrypt_asymmetric(cipher)
raise Exception(f'Unknown cipher type {cipher.enc_type}')
def decrypt_symmetric(self,
cipher: str,
organization_id: str=None,
enc: str=None,
mac: str=None,
) -> bytes:
# i.e: AesCbc256_HmacSha256_B64 (jslib/src/enums/encryptionType.ts)
assert cipher.enc_type == 2
if enc is None:
enc = self.vaultwarden_organizations[organization_id]['key'][:32]
mac = self.vaultwarden_organizations[organization_id]['key'][32:]
# verify the MAC
cmac = hmac_new(mac,
cipher.iv + cipher.ct,
sha256,
)
assert cipher.mac == cmac.digest()
# decrypt the content
c = AES.new(enc,
AES.MODE_CBC,
cipher.iv,
)
plaintext = c.decrypt(cipher.ct)
# remove PKCS#7 padding from payload, see RFC 5652
# https://tools.ietf.org/html/rfc5652#section-6.3
pad_len = plaintext[-1]
padding = bytes([pad_len] * pad_len)
if plaintext[-pad_len:] == padding:
plaintext = plaintext[:-pad_len]
return plaintext
def decrypt_asymmetric(self,
cipher: str,
) -> str:
private_key = self.decrypt(self.vaultwarden_login['PrivateKey'])
c = PKCS1_OAEP.new(RSA.importKey(private_key))
return c.decrypt(cipher.ct)
def encrypt_symmetric(self,
content: bytes,
organization_id: str=None,
enc: str=None,
mac: str=None,
) -> None:
iv = token_bytes(16)
if enc is None:
enc = self.vaultwarden_organizations[organization_id]['key'][:32]
mac = self.vaultwarden_organizations[organization_id]['key'][32:]
c = AES.new(enc,
AES.MODE_CBC,
iv,
)
pad_len = 16 - len(content) % 16
padding = bytes([ pad_len ] * pad_len)
ct = c.encrypt(content + padding)
cmac = hmac_new(mac,
iv + ct,
sha256,
)
return f"2.{b64encode(iv).decode()}|{b64encode(ct).decode()}|{b64encode(cmac.digest()).decode()}"
def encrypt_asymmetric(self,
plaintext: str,
key: str,
) -> str:
rsa_key = RSA.importKey(key)
cipher = PKCS1_OAEP.new(rsa_key).encrypt(plaintext)
b64_cipher = b64encode(cipher).decode()
return f"4.{b64_cipher}"
def get(self,
url: str,
) -> None:
with session() as req:
resp = req.get(self.vaultwarden_url + url, headers=self._get_headers())
assert resp.status_code == 200
try:
response = resp.json()
except:
response = resp.text
return response
def _post(self,
url: str,
data: dict,
) -> None:
with session() as req:
resp = req.post(self.vaultwarden_url + url,
data=data,
headers=self._get_headers(),
)
assert resp.status_code == 200, f'unable to post to url {self.vaultwarden_url}{url} with data {data}: {resp.text}'
try:
response = resp.json()
except:
response = resp.text
return response
def _put(self,
url: str,
data: dict,
) -> None:
with session() as req:
resp = req.put(self.vaultwarden_url + url,
data=data,
headers=self._get_headers(),
)
try:
response = resp.json()
except:
response = resp.text
return response
def _get_headers(self,
) -> None:
if self.vaultwarden_login == None:
return None
return {'Authorization': f'Bearer {self.vaultwarden_login["access_token"]}'}
def load_organizations(self,
only_default: bool=False,
) -> None:
values = self.get('/api/sync')
enc, mac = self._get_enc_mac(self.vaultwarden_login['master_key'])
# 'decrypt' the user_key to produce the actual keys
cipher = cipher_string_from_str(self.vaultwarden_login['Key'])
plaintext_userkey = self.decrypt_symmetric(cipher,
enc=enc,
mac=mac,
)
assert len(plaintext_userkey) == 64
self.vaultwarden_organizations = {None: {'key': plaintext_userkey, 'name': 'default', 'collections': {}}}
if not only_default:
for organization in values['Profile']['Organizations']:
plaintext = self.decrypt(organization['Key'])
self._add_organization(plaintext,
organization,
)
for collection in values['Collections']:
name = self.decrypt(collection['Name'],
collection['OrganizationId'],
).decode()
self.vaultwarden_organizations[collection['OrganizationId']]['collections'][name] = collection['Id']
def _get_enc_mac(self,
master_key: str,
) -> tuple:
enc = hkdf_expand(master_key,
b'enc',
32,
sha256,
)
mac = hkdf_expand(master_key,
b'mac',
32,
sha256,
)
return enc, mac
def _add_organization(self,
plaintext: bytes,
organization: dict,
) -> None:
organization_id = organization['Id']
self.vaultwarden_organizations[organization_id] = {'name': organization['Name'], 'key': plaintext, 'collections': {}}
def try_to_confirm(self,
organization_id,
email,
) -> bool:
# user is now in organization
user = self.get_user_informations(organization_id,
email,
)
# if account exists now, confirm it
if user['public_key']:
key = self.encrypt_asymmetric(self.vaultwarden_organizations[organization_id]['key'],
user['public_key'],
)
data = {"key": key}
confirmed = self._post(f'api/organizations/{organization_id}/users/{user["user_id"]}/confirm',
dumps(data),
)
return user['user_id'], 'Object' not in confirmed or confirmed['Object'] != 'error'
return user['user_id'], False
def get_user_informations(self,
organization_id: str,
email: str,
) -> None:
users = self.get(f'/api/organizations/{organization_id}/users')
for user in users['Data']:
if user['Email'] == email:
user_public_key = self.get(f'/api/users/{user["UserId"]}/public-key')
if not user_public_key['PublicKey']:
public_key = None
else:
public_key = b64decode(user_public_key['PublicKey'])
return {'user_id': user['Id'],
'public_key': public_key,
}
raise Exception(f'unknow email {email} in organization id {organization_id}')
def create_organization(self,
email: str,
organization_name: str,
) -> None:
private_key = self.decrypt(self.vaultwarden_login['PrivateKey'])
token = token_bytes(64)
key = self.encrypt_asymmetric(token,
private_key,
)
# defaut collection_name is organization_name
data = {
"key": key,
"collectionName": self.encrypt_symmetric(organization_name.encode(),
enc=token[:32],
mac=token[32:],
),
"name": organization_name,
"billingEmail": email,
"planType": 0,
}
organization = self._post('api/organizations',
dumps(data),
)
self.load_organizations()
#self._add_organization(token,
# organization,
# )
return organization['Id']
def invite(self,
organization_id: str,
email: str,
) -> bool:
data = {'emails': [email],
'collections': [],
'accessAll': False,
'type': 2,
}
for collection_id in self.vaultwarden_organizations[organization_id]['collections'].values():
data['collections'].append({'id': collection_id,
'readOnly': True,
'hidePasswords': False,
})
self._post(f'api/organizations/{organization_id}/users/invite',
dumps(data),
)
def create_collection(self,
organization_id: str,
collection_name: str,
user_id: str=None,
) -> None:
data = {"groups": [],
"name": self.encrypt_symmetric(collection_name.encode(),
organization_id,
),
}
collection = self._post(f'api/organizations/{organization_id}/collections',
dumps(data),
)
self.vaultwarden_organizations[organization_id]['collections'][collection_name] = collection['Id']
if user_id:
self.inscript_collection(organization_id,
collection['Id'],
user_id,
)
return collection['Id']
def inscript_collection(self,
organization_id: str,
collection_id: str,
user_id: str,
) -> None:
data = [{'id': user_id,
'readOnly': True,
'hidePasswords': False,
}]
self._put(f'api/organizations/{organization_id}/collections/{collection_id}/users',
dumps(data),
)
def store_password(self,
organization_id: str,
collection_id: str,
name: str,
username: str,
password: str,
uris: list=None,
) -> None:
"""create a cipher et store it in a share collection
"""
# FIXME uris are encoded
data = {"cipher": {
"type": 1,
"folderId": None,
"organizationId": organization_id,
"name": self.encrypt_symmetric(name.encode(),
organization_id,
),
"notes": None,
"favorite": False,
"login":
{"response": None,
"uris": uris,
"username": self.encrypt_symmetric(username.encode(),
organization_id,
),
"password": self.encrypt_symmetric(password.encode(),
organization_id,
),
"passwordRevisionDate": None,
"totp": None,
}
},
"collectionIds": [collection_id],
}
self._post('api/ciphers/admin',
dumps(data),
)
def get_password(self,
organization_id: str,
collection_name: str,
name: str,
username: str,
) -> list:
if not collection_name in self.vaultwarden_organizations[organization_id]['collections']:
return
collection_id = self.vaultwarden_organizations[organization_id]['collections'][collection_name]
ciphers = self.get(f'api/ciphers/organization-details?organizationId={organization_id}')
for cipher in ciphers['Data']:
if collection_id in cipher['CollectionIds'] and \
self.decrypt(cipher['Data']['Name'], organization_id).decode() == name and \
self.decrypt(cipher['Data']['Username'], organization_id).decode() == username:
return self.decrypt(cipher['Data']['Password'], organization_id).decode()