from yaml import load, SafeLoader from os import environ from datetime import timedelta from time import sleep from pytest import raises from redis import Redis from redis.exceptions import AuthenticationError, ResponseError def test_redis_no_password(): conf_file = f'{environ["MACHINE_TEST_DIR"]}/redis.yml' with open(conf_file) as yaml: data = load(yaml, Loader=SafeLoader) r = Redis(data['address'], db=15) with raises(AuthenticationError): r.get("Persistent") def test_redis_wrong_password(): conf_file = f'{environ["MACHINE_TEST_DIR"]}/redis.yml' with open(conf_file) as yaml: data = load(yaml, Loader=SafeLoader) r = Redis(data['address'], db=15, username=data['username'], password='a') with raises(ResponseError): r.get("Persistent") def test_redis_wrong_user(): conf_file = f'{environ["MACHINE_TEST_DIR"]}/redis.yml' with open(conf_file) as yaml: data = load(yaml, Loader=SafeLoader) r = Redis(data['address'], db=15, username="a", password=data['password']) with raises(ResponseError): r.get("Persistent") # FIXME # FIXME # FIXMEdef test_redis_no_user(): # FIXME conf_file = f'{environ["MACHINE_TEST_DIR"]}/redis.yml' # FIXME with open(conf_file) as yaml: # FIXME data = load(yaml, Loader=SafeLoader) # FIXME r = Redis(data['address'], db=15, password=data['password']) # FIXME with raises(ResponseError): # FIXME r.get("Persistent") def test_redis_migration(): conf_file = f'{environ["MACHINE_TEST_DIR"]}/redis.yml' with open(conf_file) as yaml: data = load(yaml, Loader=SafeLoader) r = Redis(data['address'], db=15, username=data['username'], password=data['password']) if 'FIRST_RUN' in environ: assert r.mset({"Persistent": "yes!"}) assert r.get("Persistent") == b'yes!' def test_redis_set_get(): conf_file = f'{environ["MACHINE_TEST_DIR"]}/redis.yml' with open(conf_file) as yaml: data = load(yaml, Loader=SafeLoader) r = Redis(data['address'], db=15, username=data['username'], password=data['password']) r.setex("runner", timedelta(seconds=1), value="now you see me, now you don't" ) assert r.exists("runner") sleep(1) assert not r.exists("runner")