test for vaultwarden
This commit is contained in:
parent
57c108aea0
commit
b695cf1f99
6 changed files with 601 additions and 11 deletions
|
@ -6,6 +6,7 @@
|
|||
<file>/etc/pki/ca-trust/source/anchors/ca_InternalReverseProxy.crt</file>
|
||||
<file engine="none" source="tmpfile-vaultwarden.conf">/tmpfiles.d/0vaultwarden.conf</file>
|
||||
<file source="vaultwarden_config.env">/etc/vaultwarden/config.env</file>
|
||||
<file>/tests/vaultwarden.yml</file>
|
||||
</service>
|
||||
</services>
|
||||
<variables>
|
||||
|
@ -27,13 +28,13 @@
|
|||
</variable>
|
||||
<variable name="vaultwarden_admin_email" type="mail" description="Adresse courriel de l'utilisateur Risotto" mandatory="True"/>
|
||||
<variable name="vaultwarden_admin_password" type="password" description="Mot de passe de l'utilisateur Risotto" auto_save="False" hidden="True"/>
|
||||
<variable name="vaultwarden_device_identifier" description="Identifiant de l'appareil se connectant" auto_save="False" hidden="True"/>
|
||||
<variable name="vaultwarden_length" type="number" description="Taille par défaut du mot de passe">
|
||||
<value>20</value>
|
||||
</variable>
|
||||
<variable name="vaultwarden_org_name" description="Nom de l'organisation lors de l'envoi des invitations" mandatory="True">
|
||||
<value>Vaultwarden</value>
|
||||
</variable>
|
||||
<variable name="vaultwarden_test_device_identifier" description="Identifiant de test de l'appareil se connectant" hidden="True"/>
|
||||
</family>
|
||||
<family name="postgresql" description="PostgreSQL">
|
||||
<variable name="pg_client_key_owner" redefine="True">
|
||||
|
@ -50,8 +51,9 @@
|
|||
<target>vaultwarden_admin_password</target>
|
||||
<param name="hide" type="variable">hide_secret</param>
|
||||
</fill>
|
||||
<fill name="gen_uuid">
|
||||
<target>vaultwarden_device_identifier</target>
|
||||
<fill name="get_uuid">
|
||||
<param name="server_name" type="variable">domain_name_eth0</param>
|
||||
<target>vaultwarden_test_device_identifier</target>
|
||||
</fill>
|
||||
<fill name="calc_value">
|
||||
<param type="boolean">True</param>
|
||||
|
|
|
@ -1,6 +0,0 @@
|
|||
from uuid import uuid4 as _uuid4
|
||||
|
||||
|
||||
def gen_uuid():
|
||||
return str(_uuid4())
|
||||
|
|
@ -1,6 +1,22 @@
|
|||
import __main__
|
||||
from os.path import dirname as _dirname, abspath as _abspath, join as _join, isfile as _isfile, isdir as _isdir
|
||||
from os import makedirs as _makedirs
|
||||
from uuid import uuid4 as _uuid4
|
||||
|
||||
|
||||
def gen_uuid():
|
||||
return str(_uuid4())
|
||||
_HERE = _dirname(_abspath(__main__.__file__))
|
||||
_PASSWORD_DIR = _join(_HERE, 'password')
|
||||
|
||||
|
||||
def get_uuid(server_name: str) -> str:
|
||||
dir_name = _join(_PASSWORD_DIR, server_name)
|
||||
if not _isdir(dir_name):
|
||||
_makedirs(dir_name)
|
||||
file_name = _join(dir_name, 'uuid')
|
||||
if not _isfile(file_name):
|
||||
uuid = str(_uuid4())
|
||||
with open(file_name, 'w') as fh:
|
||||
fh.write(uuid)
|
||||
with open(file_name, 'r') as fh:
|
||||
file_content = fh.read().strip()
|
||||
return file_content
|
||||
|
|
7
seed/vaultwarden/templates/vaultwarden.yml
Normal file
7
seed/vaultwarden/templates/vaultwarden.yml
Normal file
|
@ -0,0 +1,7 @@
|
|||
url: https://%%revprox_client_external_domainname%%{revprox_client_location[0]}
|
||||
%set %%username='rougail_test@silique.fr'
|
||||
username: %%username
|
||||
password: %%get_password(server_name=%%domain_name_eth0, username=%%username, description='test', type="cleartext", hide=%%hide_secret, temporary=False)
|
||||
privkey: %%srv_dir/vaultwarden/rsa_key.pem
|
||||
uuid: %%vaultwarden_test_device_identifier
|
||||
revprox_ip: %%revprox_client_server_ip
|
40
seed/vaultwarden/tests/test_vaultwarden.py
Normal file
40
seed/vaultwarden/tests/test_vaultwarden.py
Normal file
|
@ -0,0 +1,40 @@
|
|||
from yaml import load, SafeLoader
|
||||
from os import environ
|
||||
from mookdns import MookDns
|
||||
|
||||
from vaultwarden import VaultWarden
|
||||
|
||||
|
||||
def test_vaultwarden_login():
|
||||
conf_file = f'{environ["MACHINE_TEST_DIR"]}/vaultwarden.yml'
|
||||
with open(conf_file) as yaml:
|
||||
data = load(yaml, Loader=SafeLoader)
|
||||
with MookDns(data['revprox_ip']):
|
||||
vaultwarden = VaultWarden(data['url'], data['username'], data['uuid'], data['privkey'])
|
||||
if 'FIRST_RUN' in environ:
|
||||
vaultwarden.register(data['password'])
|
||||
vaultwarden.login(data['password'])
|
||||
vaultwarden.load_organizations()
|
||||
|
||||
|
||||
def test_vaultwarden_collection():
|
||||
conf_file = f'{environ["MACHINE_TEST_DIR"]}/vaultwarden.yml'
|
||||
with open(conf_file) as yaml:
|
||||
data = load(yaml, Loader=SafeLoader)
|
||||
with MookDns(data['revprox_ip']):
|
||||
vaultwarden = VaultWarden(data['url'], data['username'], data['uuid'], data['privkey'])
|
||||
vaultwarden.login(data['password'])
|
||||
vaultwarden.load_organizations()
|
||||
if 'FIRST_RUN' in environ:
|
||||
organization_id = vaultwarden.create_organization(data['username'],
|
||||
'test_organization',
|
||||
)
|
||||
vaultwarden.create_collection(organization_id,
|
||||
'test_collection',
|
||||
)
|
||||
assert len(vaultwarden.vaultwarden_organizations) == 2
|
||||
for org in vaultwarden.vaultwarden_organizations:
|
||||
if org is not None:
|
||||
assert vaultwarden.vaultwarden_organizations[org]['name'] == 'test_organization'
|
||||
assert len(vaultwarden.vaultwarden_organizations[org]['collections']) == 2
|
||||
assert set(vaultwarden.vaultwarden_organizations[org]['collections']) == {'test_organization', 'test_collection'}
|
531
seed/vaultwarden/tests/vaultwarden.py
Normal file
531
seed/vaultwarden/tests/vaultwarden.py
Normal file
|
@ -0,0 +1,531 @@
|
|||
from typing import Union, Tuple, Optional
|
||||
#python3-crypto
|
||||
from Cryptodome.PublicKey import RSA
|
||||
from Cryptodome.Cipher import AES, PKCS1_OAEP
|
||||
from hmac import new as hmac_new
|
||||
from secrets import token_bytes
|
||||
|
||||
from time import time
|
||||
from json import dumps
|
||||
from hashlib import pbkdf2_hmac, sha256
|
||||
#from aiohttp import ClientSession
|
||||
from requests import session
|
||||
from base64 import b64encode, b64decode
|
||||
from hkdf import hkdf_expand
|
||||
from collections import namedtuple
|
||||
from os.path import isfile
|
||||
from jwt import encode as jwt_encode, decode as jwt_decode
|
||||
|
||||
|
||||
|
||||
#BITWARDEN_PRIVATE_KEY = '/var/lib/vaultwarden_rs/rsa_key.der'
|
||||
|
||||
cipher_string_fields = {
|
||||
'enc_type': lambda enc_type,iv,mac,ct: int(enc_type),
|
||||
'iv': lambda enc_type,iv,mac,ct: iv,
|
||||
'mac': lambda enc_type,iv,mac,ct: mac,
|
||||
'ct': lambda enc_type,iv,mac,ct: ct,
|
||||
}
|
||||
CipherString = namedtuple('CipherString', cipher_string_fields.keys())
|
||||
|
||||
|
||||
# support pulling apart a VaultWarden 'CipherString' from the following
|
||||
# - cipher string: "<enc_type>.<iv>|<ct>|<mac>"
|
||||
def cipher_string_from_str(cipher_string: str) -> CipherString:
|
||||
enc_type, data = cipher_string.split('.', 1)
|
||||
if enc_type == '2':
|
||||
iv, ct, mac = (b64decode(sdata) for sdata in data.split('|', 2))
|
||||
|
||||
d = { k: fn(enc_type, iv, mac, ct) for k,fn in cipher_string_fields.items() }
|
||||
else:
|
||||
iv, mac = None, None
|
||||
ct = b64decode(data)
|
||||
d = { k: fn(enc_type, iv, mac, ct) for k, fn in cipher_string_fields.items() }
|
||||
return CipherString(**d)
|
||||
|
||||
|
||||
class VaultWarden:
|
||||
def __init__(self,
|
||||
url: str,
|
||||
email: str,
|
||||
uuid: str,
|
||||
vaultwarden_key: str,
|
||||
) -> None:
|
||||
self.vaultwarden_url = url
|
||||
self.vaultwarden_email = email.lower()
|
||||
self.vaultwarden_uuid = uuid
|
||||
self.vaultwarden_login = None
|
||||
self.vaultwarden_organizations = None
|
||||
self.vaultwarden_key = vaultwarden_key
|
||||
|
||||
def register(self,
|
||||
password: str,
|
||||
valid: bool=True,
|
||||
) -> None:
|
||||
iterations = self.get_iterations()
|
||||
master_key, hash_password = self.hash_password(password,
|
||||
iterations,
|
||||
)
|
||||
# generate symmetric key
|
||||
token = token_bytes(64)
|
||||
enc, mac = self._get_enc_mac(master_key)
|
||||
key = self.encrypt_symmetric(token,
|
||||
enc=enc,
|
||||
mac=mac,
|
||||
)
|
||||
# generate asymmetric key
|
||||
asym_key = RSA.generate(2048)
|
||||
enc_private_key = self.encrypt_symmetric(asym_key.exportKey('DER', pkcs=8),
|
||||
enc=token[:32],
|
||||
mac=token[32:],
|
||||
)
|
||||
public_key = b64encode(asym_key.publickey().exportKey('DER')).decode()
|
||||
data = {'name': self.vaultwarden_email.split('@')[0],
|
||||
'email': self.vaultwarden_email,
|
||||
'masterPasswordHash': hash_password,
|
||||
'masterPasswordHint': None,
|
||||
'key': key,
|
||||
'kdf': 0,
|
||||
'kdfIterations': iterations,
|
||||
'referenceId': None,
|
||||
'keys': {
|
||||
'publicKey': public_key,
|
||||
'encryptedPrivateKey': enc_private_key
|
||||
}
|
||||
}
|
||||
register = self._post('api/accounts/register',
|
||||
dumps(data),
|
||||
)
|
||||
if 'Object' in register and register['Object'] == 'error':
|
||||
if register["ErrorModel"]['Message'] == 'User already exists':
|
||||
return
|
||||
raise Exception(register["ErrorModel"]["Message"])
|
||||
if valid and isfile(self.vaultwarden_key):
|
||||
self.login(password)
|
||||
# values = self.get('/api/sync')
|
||||
# user_id = values['Profile']['Id']
|
||||
user_id = jwt_decode(self.vaultwarden_login['access_token'],
|
||||
algorithm="RS256",
|
||||
#pyjwt 1
|
||||
verify=False,
|
||||
#pyjwt 2
|
||||
options={"verify_signature": False},
|
||||
)['sub']
|
||||
now = int(time())
|
||||
url = self.vaultwarden_url
|
||||
if url[-1] == '/':
|
||||
url = url[:-1]
|
||||
data = {'nbf': now,
|
||||
'exp': now + 432000,
|
||||
'iss': f'{url}|verifyemail',
|
||||
'sub': user_id,
|
||||
}
|
||||
with open(self.vaultwarden_key, 'rb') as private_key_fh:
|
||||
private_key = RSA.importKey(private_key_fh.read()).exportKey('PEM')
|
||||
token = jwt_encode(data, private_key, algorithm="RS256")
|
||||
if isinstance(token, bytes):
|
||||
tocken = token.decode()
|
||||
data = {'userId': user_id,
|
||||
'token': token,
|
||||
}
|
||||
self._post('api/accounts/verify-email-token', dumps(data))
|
||||
|
||||
def login(self,
|
||||
password: str,
|
||||
) -> None:
|
||||
iterations = self.get_iterations()
|
||||
master_key, hash_password = self.hash_password(password,
|
||||
iterations,
|
||||
)
|
||||
data = {'grant_type': 'password',
|
||||
'username': self.vaultwarden_email,
|
||||
'password': hash_password,
|
||||
'scope': 'api offline_access',
|
||||
'client_id': 'desktop',
|
||||
'device_type': 7,
|
||||
'device_identifier': self.vaultwarden_uuid,
|
||||
'device_name': 'risotto',
|
||||
}
|
||||
vaultwarden_login = self._post('identity/connect/token', data)
|
||||
if 'Object' in vaultwarden_login and vaultwarden_login['Object'] == 'error':
|
||||
raise Exception(f'unable to log to VaultWarden: {vaultwarden_login["ErrorModel"]["Message"]}')
|
||||
self.vaultwarden_login = vaultwarden_login
|
||||
self.vaultwarden_login['master_key'] = master_key
|
||||
self.vaultwarden_login['hash_password'] = hash_password
|
||||
|
||||
def get_iterations(self):
|
||||
data = self._post('api/accounts/prelogin', dumps({'email': self.vaultwarden_email}))
|
||||
return data['KdfIterations']
|
||||
|
||||
def hash_password(self,
|
||||
password: str,
|
||||
iterations: int,
|
||||
) -> str:
|
||||
master_key = pbkdf2_hmac('sha256',
|
||||
password.encode(),
|
||||
self.vaultwarden_email.encode(),
|
||||
iterations,
|
||||
)
|
||||
passwd = pbkdf2_hmac('sha256',
|
||||
master_key,
|
||||
password.encode(),
|
||||
1,
|
||||
)
|
||||
return master_key, b64encode(passwd).decode()
|
||||
|
||||
def decrypt(self,
|
||||
cipher_string: str,
|
||||
organization_id: str=None,
|
||||
) -> None:
|
||||
cipher = cipher_string_from_str(cipher_string)
|
||||
if cipher.enc_type == 2:
|
||||
return self.decrypt_symmetric(cipher,
|
||||
organization_id,
|
||||
)
|
||||
elif cipher.enc_type == 4:
|
||||
if organization_id:
|
||||
raise Exception('cipher type {cipher.enc_type} cannot have organization_id')
|
||||
return self.decrypt_asymmetric(cipher)
|
||||
raise Exception(f'Unknown cipher type {cipher.enc_type}')
|
||||
|
||||
def decrypt_symmetric(self,
|
||||
cipher: str,
|
||||
organization_id: str=None,
|
||||
enc: str=None,
|
||||
mac: str=None,
|
||||
) -> bytes:
|
||||
# i.e: AesCbc256_HmacSha256_B64 (jslib/src/enums/encryptionType.ts)
|
||||
assert cipher.enc_type == 2
|
||||
if enc is None:
|
||||
enc = self.vaultwarden_organizations[organization_id]['key'][:32]
|
||||
mac = self.vaultwarden_organizations[organization_id]['key'][32:]
|
||||
# verify the MAC
|
||||
cmac = hmac_new(mac,
|
||||
cipher.iv + cipher.ct,
|
||||
sha256,
|
||||
)
|
||||
assert cipher.mac == cmac.digest()
|
||||
|
||||
# decrypt the content
|
||||
c = AES.new(enc,
|
||||
AES.MODE_CBC,
|
||||
cipher.iv,
|
||||
)
|
||||
plaintext = c.decrypt(cipher.ct)
|
||||
|
||||
# remove PKCS#7 padding from payload, see RFC 5652
|
||||
# https://tools.ietf.org/html/rfc5652#section-6.3
|
||||
pad_len = plaintext[-1]
|
||||
padding = bytes([pad_len] * pad_len)
|
||||
if plaintext[-pad_len:] == padding:
|
||||
plaintext = plaintext[:-pad_len]
|
||||
return plaintext
|
||||
|
||||
def decrypt_asymmetric(self,
|
||||
cipher: str,
|
||||
) -> str:
|
||||
private_key = self.decrypt(self.vaultwarden_login['PrivateKey'])
|
||||
c = PKCS1_OAEP.new(RSA.importKey(private_key))
|
||||
return c.decrypt(cipher.ct)
|
||||
|
||||
def encrypt_symmetric(self,
|
||||
content: bytes,
|
||||
organization_id: str=None,
|
||||
enc: str=None,
|
||||
mac: str=None,
|
||||
) -> None:
|
||||
iv = token_bytes(16)
|
||||
if enc is None:
|
||||
enc = self.vaultwarden_organizations[organization_id]['key'][:32]
|
||||
mac = self.vaultwarden_organizations[organization_id]['key'][32:]
|
||||
c = AES.new(enc,
|
||||
AES.MODE_CBC,
|
||||
iv,
|
||||
)
|
||||
pad_len = 16 - len(content) % 16
|
||||
padding = bytes([ pad_len ] * pad_len)
|
||||
ct = c.encrypt(content + padding)
|
||||
cmac = hmac_new(mac,
|
||||
iv + ct,
|
||||
sha256,
|
||||
)
|
||||
return f"2.{b64encode(iv).decode()}|{b64encode(ct).decode()}|{b64encode(cmac.digest()).decode()}"
|
||||
|
||||
def encrypt_asymmetric(self,
|
||||
plaintext: str,
|
||||
key: str,
|
||||
) -> str:
|
||||
rsa_key = RSA.importKey(key)
|
||||
cipher = PKCS1_OAEP.new(rsa_key).encrypt(plaintext)
|
||||
b64_cipher = b64encode(cipher).decode()
|
||||
return f"4.{b64_cipher}"
|
||||
|
||||
def get(self,
|
||||
url: str,
|
||||
) -> None:
|
||||
with session() as req:
|
||||
resp = req.get(self.vaultwarden_url + url, headers=self._get_headers())
|
||||
assert resp.status_code == 200
|
||||
try:
|
||||
response = resp.json()
|
||||
except:
|
||||
response = resp.text
|
||||
return response
|
||||
|
||||
def _post(self,
|
||||
url: str,
|
||||
data: dict,
|
||||
) -> None:
|
||||
with session() as req:
|
||||
resp = req.post(self.vaultwarden_url + url,
|
||||
data=data,
|
||||
headers=self._get_headers(),
|
||||
)
|
||||
assert resp.status_code == 200, f'unable to post to url {self.vaultwarden_url}{url} with data {data}: {resp.text}'
|
||||
try:
|
||||
response = resp.json()
|
||||
except:
|
||||
response = resp.text
|
||||
return response
|
||||
|
||||
def _put(self,
|
||||
url: str,
|
||||
data: dict,
|
||||
) -> None:
|
||||
with session() as req:
|
||||
resp = req.put(self.vaultwarden_url + url,
|
||||
data=data,
|
||||
headers=self._get_headers(),
|
||||
)
|
||||
try:
|
||||
response = resp.json()
|
||||
except:
|
||||
response = resp.text
|
||||
return response
|
||||
|
||||
def _get_headers(self,
|
||||
) -> None:
|
||||
if self.vaultwarden_login == None:
|
||||
return None
|
||||
return {'Authorization': f'Bearer {self.vaultwarden_login["access_token"]}'}
|
||||
|
||||
def load_organizations(self,
|
||||
only_default: bool=False,
|
||||
) -> None:
|
||||
values = self.get('/api/sync')
|
||||
enc, mac = self._get_enc_mac(self.vaultwarden_login['master_key'])
|
||||
# 'decrypt' the user_key to produce the actual keys
|
||||
cipher = cipher_string_from_str(self.vaultwarden_login['Key'])
|
||||
plaintext_userkey = self.decrypt_symmetric(cipher,
|
||||
enc=enc,
|
||||
mac=mac,
|
||||
)
|
||||
assert len(plaintext_userkey) == 64
|
||||
self.vaultwarden_organizations = {None: {'key': plaintext_userkey, 'name': 'default', 'collections': {}}}
|
||||
if not only_default:
|
||||
for organization in values['Profile']['Organizations']:
|
||||
plaintext = self.decrypt(organization['Key'])
|
||||
self._add_organization(plaintext,
|
||||
organization,
|
||||
)
|
||||
for collection in values['Collections']:
|
||||
name = self.decrypt(collection['Name'],
|
||||
collection['OrganizationId'],
|
||||
).decode()
|
||||
self.vaultwarden_organizations[collection['OrganizationId']]['collections'][name] = collection['Id']
|
||||
|
||||
def _get_enc_mac(self,
|
||||
master_key: str,
|
||||
) -> tuple:
|
||||
enc = hkdf_expand(master_key,
|
||||
b'enc',
|
||||
32,
|
||||
sha256,
|
||||
)
|
||||
mac = hkdf_expand(master_key,
|
||||
b'mac',
|
||||
32,
|
||||
sha256,
|
||||
)
|
||||
return enc, mac
|
||||
|
||||
def _add_organization(self,
|
||||
plaintext: bytes,
|
||||
organization: dict,
|
||||
) -> None:
|
||||
organization_id = organization['Id']
|
||||
self.vaultwarden_organizations[organization_id] = {'name': organization['Name'], 'key': plaintext, 'collections': {}}
|
||||
|
||||
def try_to_confirm(self,
|
||||
organization_id,
|
||||
email,
|
||||
) -> bool:
|
||||
# user is now in organization
|
||||
user = self.get_user_informations(organization_id,
|
||||
email,
|
||||
)
|
||||
|
||||
# if account exists now, confirm it
|
||||
if user['public_key']:
|
||||
key = self.encrypt_asymmetric(self.vaultwarden_organizations[organization_id]['key'],
|
||||
user['public_key'],
|
||||
)
|
||||
data = {"key": key}
|
||||
confirmed = self._post(f'api/organizations/{organization_id}/users/{user["user_id"]}/confirm',
|
||||
dumps(data),
|
||||
)
|
||||
return user['user_id'], 'Object' not in confirmed or confirmed['Object'] != 'error'
|
||||
return user['user_id'], False
|
||||
|
||||
def get_user_informations(self,
|
||||
organization_id: str,
|
||||
email: str,
|
||||
) -> None:
|
||||
users = self.get(f'/api/organizations/{organization_id}/users')
|
||||
for user in users['Data']:
|
||||
if user['Email'] == email:
|
||||
user_public_key = self.get(f'/api/users/{user["UserId"]}/public-key')
|
||||
if not user_public_key['PublicKey']:
|
||||
public_key = None
|
||||
else:
|
||||
public_key = b64decode(user_public_key['PublicKey'])
|
||||
return {'user_id': user['Id'],
|
||||
'public_key': public_key,
|
||||
}
|
||||
raise Exception(f'unknow email {email} in organization id {organization_id}')
|
||||
|
||||
def create_organization(self,
|
||||
email: str,
|
||||
organization_name: str,
|
||||
) -> None:
|
||||
private_key = self.decrypt(self.vaultwarden_login['PrivateKey'])
|
||||
token = token_bytes(64)
|
||||
key = self.encrypt_asymmetric(token,
|
||||
private_key,
|
||||
)
|
||||
# defaut collection_name is organization_name
|
||||
data = {
|
||||
"key": key,
|
||||
"collectionName": self.encrypt_symmetric(organization_name.encode(),
|
||||
enc=token[:32],
|
||||
mac=token[32:],
|
||||
),
|
||||
"name": organization_name,
|
||||
"billingEmail": email,
|
||||
"planType": 0,
|
||||
}
|
||||
organization = self._post('api/organizations',
|
||||
dumps(data),
|
||||
)
|
||||
self.load_organizations()
|
||||
#self._add_organization(token,
|
||||
# organization,
|
||||
# )
|
||||
return organization['Id']
|
||||
|
||||
def invite(self,
|
||||
organization_id: str,
|
||||
email: str,
|
||||
) -> bool:
|
||||
data = {'emails': [email],
|
||||
'collections': [],
|
||||
'accessAll': False,
|
||||
'type': 2,
|
||||
}
|
||||
for collection_id in self.vaultwarden_organizations[organization_id]['collections'].values():
|
||||
data['collections'].append({'id': collection_id,
|
||||
'readOnly': True,
|
||||
'hidePasswords': False,
|
||||
})
|
||||
self._post(f'api/organizations/{organization_id}/users/invite',
|
||||
dumps(data),
|
||||
)
|
||||
|
||||
def create_collection(self,
|
||||
organization_id: str,
|
||||
collection_name: str,
|
||||
user_id: str=None,
|
||||
) -> None:
|
||||
data = {"groups": [],
|
||||
"name": self.encrypt_symmetric(collection_name.encode(),
|
||||
organization_id,
|
||||
),
|
||||
}
|
||||
collection = self._post(f'api/organizations/{organization_id}/collections',
|
||||
dumps(data),
|
||||
)
|
||||
self.vaultwarden_organizations[organization_id]['collections'][collection_name] = collection['Id']
|
||||
if user_id:
|
||||
self.inscript_collection(organization_id,
|
||||
collection['Id'],
|
||||
user_id,
|
||||
)
|
||||
return collection['Id']
|
||||
|
||||
def inscript_collection(self,
|
||||
organization_id: str,
|
||||
collection_id: str,
|
||||
user_id: str,
|
||||
) -> None:
|
||||
data = [{'id': user_id,
|
||||
'readOnly': True,
|
||||
'hidePasswords': False,
|
||||
}]
|
||||
self._put(f'api/organizations/{organization_id}/collections/{collection_id}/users',
|
||||
dumps(data),
|
||||
)
|
||||
|
||||
def store_password(self,
|
||||
organization_id: str,
|
||||
collection_id: str,
|
||||
name: str,
|
||||
username: str,
|
||||
password: str,
|
||||
uris: list=None,
|
||||
) -> None:
|
||||
"""create a cipher et store it in a share collection
|
||||
"""
|
||||
# FIXME uris are encoded
|
||||
data = {"cipher": {
|
||||
"type": 1,
|
||||
"folderId": None,
|
||||
"organizationId": organization_id,
|
||||
"name": self.encrypt_symmetric(name.encode(),
|
||||
organization_id,
|
||||
),
|
||||
"notes": None,
|
||||
"favorite": False,
|
||||
"login":
|
||||
{"response": None,
|
||||
"uris": uris,
|
||||
"username": self.encrypt_symmetric(username.encode(),
|
||||
organization_id,
|
||||
),
|
||||
"password": self.encrypt_symmetric(password.encode(),
|
||||
organization_id,
|
||||
),
|
||||
"passwordRevisionDate": None,
|
||||
"totp": None,
|
||||
}
|
||||
},
|
||||
"collectionIds": [collection_id],
|
||||
}
|
||||
self._post('api/ciphers/admin',
|
||||
dumps(data),
|
||||
)
|
||||
|
||||
def get_password(self,
|
||||
organization_id: str,
|
||||
collection_name: str,
|
||||
name: str,
|
||||
username: str,
|
||||
) -> list:
|
||||
if not collection_name in self.vaultwarden_organizations[organization_id]['collections']:
|
||||
return
|
||||
collection_id = self.vaultwarden_organizations[organization_id]['collections'][collection_name]
|
||||
ciphers = self.get(f'api/ciphers/organization-details?organizationId={organization_id}')
|
||||
for cipher in ciphers['Data']:
|
||||
if collection_id in cipher['CollectionIds'] and \
|
||||
self.decrypt(cipher['Data']['Name'], organization_id).decode() == name and \
|
||||
self.decrypt(cipher['Data']['Username'], organization_id).decode() == username:
|
||||
return self.decrypt(cipher['Data']['Password'], organization_id).decode()
|
Loading…
Reference in a new issue