forked from stove/dataset
68 lines
2.3 KiB
Python
68 lines
2.3 KiB
Python
from yaml import load, SafeLoader
|
|
from os import environ
|
|
from datetime import timedelta
|
|
from time import sleep
|
|
from pytest import raises
|
|
from redis import Redis
|
|
from redis.exceptions import AuthenticationError, ResponseError
|
|
|
|
|
|
|
|
def test_redis_no_password():
|
|
conf_file = f'{environ["MACHINE_TEST_DIR"]}/redis.yml'
|
|
with open(conf_file) as yaml:
|
|
data = load(yaml, Loader=SafeLoader)
|
|
r = Redis(data['address'], db=15)
|
|
with raises(AuthenticationError):
|
|
r.get("Persistent")
|
|
|
|
|
|
def test_redis_wrong_password():
|
|
conf_file = f'{environ["MACHINE_TEST_DIR"]}/redis.yml'
|
|
with open(conf_file) as yaml:
|
|
data = load(yaml, Loader=SafeLoader)
|
|
r = Redis(data['address'], db=15, username=data['username'], password='a')
|
|
with raises(ResponseError):
|
|
r.get("Persistent")
|
|
|
|
|
|
def test_redis_wrong_user():
|
|
conf_file = f'{environ["MACHINE_TEST_DIR"]}/redis.yml'
|
|
with open(conf_file) as yaml:
|
|
data = load(yaml, Loader=SafeLoader)
|
|
r = Redis(data['address'], db=15, username="a", password=data['password'])
|
|
with raises(ResponseError):
|
|
r.get("Persistent")
|
|
# FIXME
|
|
# FIXME
|
|
# FIXMEdef test_redis_no_user():
|
|
# FIXME conf_file = f'{environ["MACHINE_TEST_DIR"]}/redis.yml'
|
|
# FIXME with open(conf_file) as yaml:
|
|
# FIXME data = load(yaml, Loader=SafeLoader)
|
|
# FIXME r = Redis(data['address'], db=15, password=data['password'])
|
|
# FIXME with raises(ResponseError):
|
|
# FIXME r.get("Persistent")
|
|
|
|
|
|
def test_redis_migration():
|
|
conf_file = f'{environ["MACHINE_TEST_DIR"]}/redis.yml'
|
|
with open(conf_file) as yaml:
|
|
data = load(yaml, Loader=SafeLoader)
|
|
r = Redis(data['address'], db=15, username=data['username'], password=data['password'])
|
|
if 'FIRST_RUN' in environ:
|
|
assert r.mset({"Persistent": "yes!"})
|
|
assert r.get("Persistent") == b'yes!'
|
|
|
|
|
|
def test_redis_set_get():
|
|
conf_file = f'{environ["MACHINE_TEST_DIR"]}/redis.yml'
|
|
with open(conf_file) as yaml:
|
|
data = load(yaml, Loader=SafeLoader)
|
|
r = Redis(data['address'], db=15, username=data['username'], password=data['password'])
|
|
r.setex("runner",
|
|
timedelta(seconds=1),
|
|
value="now you see me, now you don't"
|
|
)
|
|
assert r.exists("runner")
|
|
sleep(1)
|
|
assert not r.exists("runner")
|