mirror of
https://github.com/kennethreitz/replit-py.git
synced 2026-06-05 23:10:18 +00:00
use new key query param and add some tests (#13)
* first test * unescape returned keys * fix to_dict * wip * a bit of sync * dict methods * lint * oop * run the tests * concurrently * secret
This commit is contained in:
@@ -5,6 +5,7 @@ import json
|
||||
import os
|
||||
from sys import stderr
|
||||
from typing import Any, Callable, Dict, Tuple, Union
|
||||
import urllib
|
||||
|
||||
import aiohttp
|
||||
import nest_asyncio
|
||||
@@ -20,7 +21,7 @@ class AsyncJSONKey:
|
||||
you don't have to do it manually.
|
||||
"""
|
||||
|
||||
__slots__ = ("db", "key", "dtype", "get_default", "discard_bad_data")
|
||||
__slots__ = ("db", "key", "dtype", "get_default", "discard_bad_data", "do_raise")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -216,14 +217,15 @@ class AsyncReplitDb:
|
||||
Returns:
|
||||
Tuple[str]: The keys found.
|
||||
"""
|
||||
params = {"prefix": prefix, "encode": "true"}
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(self.db_url + "?prefix=" + prefix) as response:
|
||||
async with session.get(self.db_url, params=params) as response:
|
||||
response.raise_for_status()
|
||||
text = await response.text()
|
||||
if not text:
|
||||
return tuple()
|
||||
else:
|
||||
return tuple(text.split("\n"))
|
||||
return tuple(urllib.parse.unquote(k) for k in text.split("\n"))
|
||||
|
||||
async def to_dict(self, prefix: str = "") -> Dict[str, str]:
|
||||
"""Dump all data in the database into a dictionary.
|
||||
@@ -238,7 +240,7 @@ class AsyncReplitDb:
|
||||
ret = {}
|
||||
keys = await self.list(prefix=prefix)
|
||||
for i in keys:
|
||||
ret[i] = await self.view(i)
|
||||
ret[i] = await self.get(i)
|
||||
return ret
|
||||
|
||||
async def keys(self) -> Tuple[str]:
|
||||
@@ -263,6 +265,7 @@ class AsyncReplitDb:
|
||||
dtype: JSON_TYPE,
|
||||
get_default: Callable = None,
|
||||
discard_bad_data: bool = False,
|
||||
do_raise: bool = False,
|
||||
) -> AsyncJSONKey:
|
||||
"""Initialize an AsyncJSONKey instance.
|
||||
|
||||
@@ -277,6 +280,7 @@ class AsyncReplitDb:
|
||||
argument is used.
|
||||
discard_bad_data (bool): Don't prompt if bad data is read, overwrite it
|
||||
with the default. Defaults to False.
|
||||
do_raise (bool): Whether to raise exceptions when errors are encountered.
|
||||
|
||||
Returns:
|
||||
AsyncJSONKey: The initialized AsyncJSONKey instance.
|
||||
@@ -287,6 +291,7 @@ class AsyncReplitDb:
|
||||
dtype=dtype,
|
||||
get_default=get_default,
|
||||
discard_bad_data=discard_bad_data,
|
||||
do_raise=do_raise,
|
||||
)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
|
||||
@@ -0,0 +1,128 @@
|
||||
"""Tests for replit.database."""
|
||||
|
||||
import os
|
||||
import unittest
|
||||
|
||||
from replit.database import AsyncReplitDb, ReplitDb
|
||||
import requests
|
||||
|
||||
|
||||
class TestAsyncDatabase(unittest.IsolatedAsyncioTestCase):
|
||||
"""Tests for replit.database.AsyncReplitDb."""
|
||||
|
||||
async def asyncSetUp(self) -> None:
|
||||
"""Grab a JWT for all the tests to share."""
|
||||
password = os.environ["PASSWORD"]
|
||||
req = requests.get(
|
||||
"https://database-test-jwt.kochman.repl.co", auth=("test", password)
|
||||
)
|
||||
url = req.text
|
||||
self.db = AsyncReplitDb(url)
|
||||
|
||||
async def asyncTearDown(self) -> None:
|
||||
"""Nuke whatever the test added."""
|
||||
for k in await self.db.keys():
|
||||
await self.db.delete(k)
|
||||
|
||||
async def test_get_set_delete(self) -> None:
|
||||
"""Test that we can get, set, and delete a key."""
|
||||
await self.db.set("test-key", "value")
|
||||
|
||||
val = await self.db.get("test-key")
|
||||
self.assertEqual(val, "value")
|
||||
|
||||
await self.db.delete("test-key")
|
||||
with self.assertRaises(KeyError):
|
||||
await self.db.get("test-key")
|
||||
|
||||
async def test_list_keys(self) -> None:
|
||||
"""Test that we can list keys."""
|
||||
key = "test-list-keys-with\nnewline"
|
||||
await self.db.set(key, "value")
|
||||
|
||||
val = await self.db.get(key)
|
||||
self.assertEqual(val, "value")
|
||||
|
||||
keys = await self.db.list(key)
|
||||
self.assertEqual(keys, (key,))
|
||||
|
||||
await self.db.delete(key)
|
||||
with self.assertRaises(KeyError):
|
||||
await self.db.get(key)
|
||||
|
||||
async def test_list_values(self) -> None:
|
||||
"""Test that we can get all values."""
|
||||
key = "test-list-values"
|
||||
await self.db.set(key + "1", "value1")
|
||||
await self.db.set(key + "2", "value2")
|
||||
|
||||
vals = await self.db.values()
|
||||
self.assertTupleEqual(vals, ("value1", "value2"))
|
||||
|
||||
async def test_dict(self) -> None:
|
||||
"""Test that we can get a dict."""
|
||||
await self.db.set("key1", "value")
|
||||
await self.db.set("key2", "value")
|
||||
d = await self.db.to_dict()
|
||||
self.assertDictEqual(d, {"key1": "value", "key2": "value"})
|
||||
|
||||
async def test_jsonkey(self) -> None:
|
||||
"""Test replit.database.AsyncJSONKey."""
|
||||
key = "test-jsonkey"
|
||||
|
||||
jk = self.db.jsonkey(key, dtype=str, do_raise=True)
|
||||
with self.assertRaises(KeyError):
|
||||
await jk.get()
|
||||
await jk.set("value")
|
||||
val = await jk.get()
|
||||
self.assertEqual(val, "value")
|
||||
|
||||
async def test_jsonkey_default(self) -> None:
|
||||
"""Test replit.database.AsyncJSONKey with a default callable."""
|
||||
key = "test-jsonkey"
|
||||
|
||||
jk = self.db.jsonkey(key, dtype=str, get_default=lambda: "value")
|
||||
val = await jk.get()
|
||||
self.assertEqual(val, "value")
|
||||
|
||||
|
||||
class TestDatabase(unittest.IsolatedAsyncioTestCase):
|
||||
"""Tests for replit.database.ReplitDb."""
|
||||
|
||||
def setUp(self) -> None:
|
||||
"""Grab a JWT for all the tests to share."""
|
||||
password = os.environ["PASSWORD"]
|
||||
req = requests.get(
|
||||
"https://database-test-jwt.kochman.repl.co", auth=("test", password)
|
||||
)
|
||||
url = req.text
|
||||
self.db = ReplitDb(url)
|
||||
|
||||
async def tearDown(self) -> None:
|
||||
"""Nuke whatever the test added."""
|
||||
for k in await self.db.keys():
|
||||
await self.db.delete(k)
|
||||
|
||||
def test_get_set_delete(self) -> None:
|
||||
"""Test get, set, and delete."""
|
||||
with self.assertRaises(KeyError):
|
||||
self.db.get("key")
|
||||
|
||||
self.db.set("key", "value")
|
||||
val = self.db.get("key")
|
||||
self.assertEqual(val, "value")
|
||||
|
||||
self.db.delete("key")
|
||||
|
||||
def test_dict(self) -> None:
|
||||
"""Test using the database as a dict."""
|
||||
with self.assertRaises(KeyError):
|
||||
val = self.db["hi"]
|
||||
|
||||
self.db["hi"] = "there"
|
||||
val = self.db.get("hi")
|
||||
self.assertEqual(val, "there")
|
||||
|
||||
del self.db["hi"]
|
||||
with self.assertRaises(KeyError):
|
||||
val = self.db["hi"]
|
||||
Reference in New Issue
Block a user