mirror of
https://github.com/kennethreitz-archive/www.gittip.com.git
synced 2026-06-19 23:00:57 +00:00
5853c273b6
Deletion is like absorption, and absorption is not ported to SQLAlchemy yet. I factored out the common part (archiving) and implemented deletion using the old wiring. This includes a few tests, but they're borked atm.
491 lines
17 KiB
Python
491 lines
17 KiB
Python
"""Helpers for testing Gittip.
|
|
"""
|
|
from __future__ import unicode_literals
|
|
|
|
import datetime
|
|
import copy
|
|
import os
|
|
import random
|
|
import re
|
|
import unittest
|
|
from decimal import Decimal
|
|
from os.path import join, dirname, realpath
|
|
|
|
import gittip
|
|
from aspen import resources
|
|
from aspen.testing import Website, StubRequest
|
|
from aspen.utils import utcnow
|
|
from gittip import orm, wireup
|
|
from gittip.models.participant import Participant
|
|
from gittip.authentication import User
|
|
from gittip.billing.payday import Payday
|
|
|
|
|
|
TOP = join(realpath(dirname(dirname(__file__))), '..')
|
|
SCHEMA = open(join(TOP, "schema.sql")).read()
|
|
|
|
DUMMY_GITHUB_JSON = u'{"html_url":"https://github.com/whit537","type":"User","public_repos":25,"blog":"http://whit537.org/","gravatar_id":"fb054b407a6461e417ee6b6ae084da37","public_gists":29,"following":15,"updated_at":"2013-01-14T13:43:23Z","company":"Gittip","events_url":"https://api.github.com/users/whit537/events{/privacy}","repos_url":"https://api.github.com/users/whit537/repos","gists_url":"https://api.github.com/users/whit537/gists{/gist_id}","email":"chad@zetaweb.com","organizations_url":"https://api.github.com/users/whit537/orgs","hireable":false,"received_events_url":"https://api.github.com/users/whit537/received_events","starred_url":"https://api.github.com/users/whit537/starred{/owner}{/repo}","login":"whit537","created_at":"2009-10-03T02:47:57Z","bio":"","url":"https://api.github.com/users/whit537","avatar_url":"https://secure.gravatar.com/avatar/fb054b407a6461e417ee6b6ae084da37?d=https://a248.e.akamai.net/assets.github.com%2Fimages%2Fgravatars%2Fgravatar-user-420.png","followers":90,"name":"Chad Whitacre","followers_url":"https://api.github.com/users/whit537/followers","following_url":"https://api.github.com/users/whit537/following","id":134455,"location":"Pittsburgh, PA","subscriptions_url":"https://api.github.com/users/whit537/subscriptions"}'
|
|
"JSON data as returned from github for whit537 ;)"
|
|
|
|
GITHUB_USER_UNREGISTERED_LGTEST = u'{"public_repos":0,"html_url":"https://github.com/lgtest","type":"User","repos_url":"https://api.github.com/users/lgtest/repos","gravatar_id":"d41d8cd98f00b204e9800998ecf8427e","following":0,"public_gists":0,"updated_at":"2013-01-04T17:24:57Z","received_events_url":"https://api.github.com/users/lgtest/received_events","gists_url":"https://api.github.com/users/lgtest/gists{/gist_id}","events_url":"https://api.github.com/users/lgtest/events{/privacy}","organizations_url":"https://api.github.com/users/lgtest/orgs","avatar_url":"https://secure.gravatar.com/avatar/d41d8cd98f00b204e9800998ecf8427e?d=https://a248.e.akamai.net/assets.github.com%2Fimages%2Fgravatars%2Fgravatar-user-420.png","login":"lgtest","created_at":"2012-05-24T20:09:07Z","starred_url":"https://api.github.com/users/lgtest/starred{/owner}{/repo}","url":"https://api.github.com/users/lgtest","followers":0,"followers_url":"https://api.github.com/users/lgtest/followers","following_url":"https://api.github.com/users/lgtest/following","id":1775515,"subscriptions_url":"https://api.github.com/users/lgtest/subscriptions"}'
|
|
"JSON data as returned from github for unregistered user ``lgtest``"
|
|
|
|
|
|
def create_schema(db):
|
|
db.execute(SCHEMA)
|
|
|
|
GITHUB_USERS = [ ("1775515", "lgtest")
|
|
, ("1903357", "lglocktest")
|
|
, ("1933953", "gittip-test-0")
|
|
, ("1933959", "gittip-test-1")
|
|
, ("1933965", "gittip-test-2")
|
|
, ("1933967", "gittip-test-3")
|
|
]
|
|
|
|
def populate_db_with_dummy_data(db):
|
|
from gittip.elsewhere.github import GitHubAccount
|
|
from gittip.participant import Participant
|
|
for user_id, login in GITHUB_USERS:
|
|
account = GitHubAccount(user_id, {"id": user_id, "login": login})
|
|
Participant(account.participant_id).change_id(login)
|
|
|
|
|
|
class Harness(unittest.TestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.db = orm.db
|
|
cls.session = orm.db.session
|
|
|
|
def setUp(self):
|
|
pass
|
|
|
|
def tearDown(self):
|
|
self.db.empty_tables()
|
|
|
|
def make_participant(self, participant_id, **kw):
|
|
participant = Participant(id=participant_id, **kw)
|
|
self.session.add(participant)
|
|
self.session.commit()
|
|
return participant
|
|
|
|
|
|
class GittipBaseDBTest(unittest.TestCase):
|
|
"""
|
|
|
|
Will setup a db connection so we can perform db operations. Everything is
|
|
performed in a transaction and will be rolled back at the end of the test
|
|
so we don't clutter up the db.
|
|
|
|
"""
|
|
def setUp(self):
|
|
populate_db_with_dummy_data(self.db)
|
|
self.conn = self.db.get_connection()
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
cls.db = gittip.db = wireup.db()
|
|
|
|
def tearDown(self):
|
|
# TODO: rollback transaction here so we don't fill up test db.
|
|
# TODO: hack for now, truncate all tables.
|
|
tables = [ 'participants'
|
|
, 'elsewhere'
|
|
, 'tips'
|
|
, 'transfers'
|
|
, 'paydays'
|
|
, 'exchanges'
|
|
, 'absorptions'
|
|
, 'deletions'
|
|
]
|
|
for t in tables:
|
|
self.db.execute('truncate table %s cascade' % t)
|
|
|
|
|
|
class GittipPaydayTest(GittipBaseDBTest):
|
|
|
|
def setUp(self):
|
|
super(GittipPaydayTest, self).setUp()
|
|
self.payday = Payday(self.db)
|
|
|
|
|
|
# Helpers for managing test data.
|
|
# ===============================
|
|
|
|
colname_re = re.compile("^[A-Za-z0-9_]+$")
|
|
|
|
class Context(object):
|
|
"""This is a context manager for testing.
|
|
|
|
load = testing.Context()
|
|
|
|
def test():
|
|
with load(*data):
|
|
actual = my_func()
|
|
expected = "Cheese whiz!"
|
|
assert actual == expected, actual
|
|
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.db = wireup.db()
|
|
self.billing = wireup.billing()
|
|
self._delete_data()
|
|
|
|
def __call__(self, *data):
|
|
"""Load up the database with data.
|
|
|
|
Here's the format for data:
|
|
|
|
( "table1", (), {}
|
|
, "table2", {}, [], {}
|
|
)
|
|
|
|
If it's a basestring it's a table name, if it's a dict it's a mapping
|
|
of colname to value, if it's a tuple or list it's a sequence of values.
|
|
|
|
"""
|
|
known_tables = self._get_table_names()
|
|
table_name = ""
|
|
|
|
for thing in data:
|
|
|
|
typ = type(thing)
|
|
|
|
if typ in (str, unicode):
|
|
table_name = thing
|
|
if table_name not in known_tables: # SQLi pro
|
|
raise ValueError("Unknown table: %s" % table_name)
|
|
continue
|
|
|
|
if not table_name:
|
|
raise ValueError("What table am I INSERTing into?")
|
|
|
|
row = thing
|
|
n = len(row)
|
|
|
|
if typ is dict:
|
|
colnames = []
|
|
values = []
|
|
for colname, value in sorted(row.iteritems()):
|
|
if colname_re.match(colname) is None: # SQLi pro
|
|
raise ValueError( "colname must match %s"
|
|
% colname_re.pattern)
|
|
colnames.append(colname)
|
|
values.append(value)
|
|
colnames = ' (%s) ' % ', '.join(colnames)
|
|
elif typ in (list, tuple):
|
|
colnames = ' '
|
|
values = thing
|
|
|
|
values = tuple(values)
|
|
value_placeholders = ', '.join(['%s'] * n)
|
|
|
|
SQL = "INSERT INTO %s%sVALUES (%s)"
|
|
SQL %= (table_name, colnames, value_placeholders)
|
|
|
|
self.db.execute(SQL, values)
|
|
|
|
self.a = self.dump()
|
|
return self
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, *a):
|
|
orm.rollback()
|
|
self._delete_data()
|
|
|
|
def diff(self, compact=False):
|
|
"""Compare the data state now with when we started.
|
|
"""
|
|
a = copy.deepcopy(self.a) # avoid mutation
|
|
b = self.dump()
|
|
return self._diff(a, b, compact)
|
|
|
|
def _diff(self, a, b, compact):
|
|
"""Compare two data dumps.
|
|
"""
|
|
out = {}
|
|
pkeys = self._get_primary_keys()
|
|
assert sorted(a.keys()) == sorted(b.keys()), \
|
|
"Sorry, diff isn't designed for DDL"
|
|
for table_name, b_table in b.items():
|
|
a_table = a[table_name]
|
|
|
|
inserts = []
|
|
updates = []
|
|
deletes = []
|
|
|
|
# Be sure to sort {a,b}_table.items() so we can depend on the sort
|
|
# order of the inserts, updates, and deletes lists.
|
|
# See https://github.com/zetaweb/www.gittip.com/issues/413.
|
|
|
|
for key, row in sorted(b_table.items()):
|
|
if key not in a_table:
|
|
inserts.append(row)
|
|
else:
|
|
update = {}
|
|
for colname, value in row.items():
|
|
if a_table[key][colname] != value:
|
|
update[colname] = value
|
|
if update:
|
|
pkey = pkeys[table_name]
|
|
update[pkey] = row[pkey] # include primary key
|
|
updates.append(update)
|
|
|
|
for key, row in sorted(a_table.items()):
|
|
if key not in b_table:
|
|
deletes.append(row)
|
|
|
|
if inserts or updates or deletes:
|
|
out[table_name] = {}
|
|
if compact:
|
|
out[table_name] = [ len(inserts)
|
|
, len(updates)
|
|
, len(deletes)
|
|
]
|
|
else:
|
|
out[table_name] = { "inserts": inserts
|
|
, "updates": updates
|
|
, "deletes": deletes
|
|
}
|
|
|
|
return out
|
|
|
|
def dump(self):
|
|
"""Return a dump of the database.
|
|
|
|
Format:
|
|
|
|
{ "table1": {1: {}, 2: {}}
|
|
, "table2": {1: {}}
|
|
}
|
|
|
|
That's table name to a mapping of primary key to the entire row as a
|
|
dict.
|
|
|
|
"""
|
|
out = {}
|
|
pkeys = self._get_primary_keys()
|
|
for table_name in self._get_table_names():
|
|
pkey = pkeys[table_name]
|
|
rows = self.db.fetchall("SELECT * FROM %s ORDER BY %s"
|
|
% (table_name, pkey))
|
|
if rows is None:
|
|
rows = []
|
|
mapped = {}
|
|
for row in rows:
|
|
key = row[pkey]
|
|
mapped[key] = row
|
|
out[table_name] = mapped
|
|
return out
|
|
|
|
def _get_table_names(self):
|
|
"""Return a sorted list of tables in the public schema.
|
|
"""
|
|
tables = self.db.fetchall("SELECT tablename FROM pg_tables "
|
|
"WHERE schemaname='public'")
|
|
if tables is None:
|
|
tables = []
|
|
else:
|
|
tables = [rec['tablename'] for rec in tables]
|
|
tables.sort()
|
|
return tables
|
|
|
|
def _get_primary_keys(self):
|
|
"""Return a mapping of table name in the public schema to primary key.
|
|
"""
|
|
_pkeys = self.db.fetchall("""
|
|
|
|
SELECT tablename, indexdef
|
|
FROM pg_indexes
|
|
WHERE schemaname='public'
|
|
AND indexname LIKE '%_pkey'
|
|
|
|
""")
|
|
if _pkeys is None:
|
|
_pkeys = []
|
|
else:
|
|
pkeys = {}
|
|
for row in _pkeys:
|
|
pkey = row['indexdef'].split('(')[1].split(')')[0]
|
|
pkeys[row['tablename']] = pkey
|
|
return pkeys
|
|
|
|
def _delete_data(self):
|
|
"""Delete all data from all tables in the public schema (eep!).
|
|
"""
|
|
safety_belt = os.environ["YES_PLEASE_DELETE_ALL_MY_DATA_VERY_OFTEN"]
|
|
if safety_belt != "Pretty please, with sugar on top.":
|
|
raise Exception("Heck.")
|
|
|
|
for table_name in self._get_table_names():
|
|
self.db.execute("TRUNCATE TABLE %s CASCADE" % table_name)
|
|
|
|
load = Context()
|
|
|
|
def start_payday(*data):
|
|
context = load(*data)
|
|
context.payday = Payday(gittip.db)
|
|
ts_start = context.payday.start()
|
|
context.payday.zero_out_pending(ts_start)
|
|
context.ts_start = ts_start
|
|
return context
|
|
|
|
|
|
def setup_tips(*recs):
|
|
"""Setup some participants and tips. recs is a list of:
|
|
|
|
("tipper", "tippee", '2.00', True, False, True, "github", "12345")
|
|
^ ^ ^
|
|
| | |
|
|
| | -- claimed?
|
|
| -- is_suspicious?
|
|
|-- good cc?
|
|
|
|
tipper must be a unicode
|
|
tippee can be None or unicode
|
|
amount can be None or unicode
|
|
good_cc can be True, False, or None
|
|
is_suspicious can be True, False, or None
|
|
claimed can be True or False
|
|
platform can be unicode
|
|
user_id can be unicode
|
|
|
|
"""
|
|
tips = []
|
|
|
|
_participants = {}
|
|
randid = lambda: unicode(random.randint(1, 1000000))
|
|
|
|
for rec in recs:
|
|
good_cc, is_suspicious, claimed, platform, user_id = \
|
|
(True, False, True, "github", randid())
|
|
|
|
if len(rec) == 3:
|
|
tipper, tippee, amount = rec
|
|
elif len(rec) == 4:
|
|
tipper, tippee, amount, good_cc = rec
|
|
is_suspicious, claimed = (False, True)
|
|
elif len(rec) == 5:
|
|
tipper, tippee, amount, good_cc, is_suspicious = rec
|
|
claimed = True
|
|
elif len(rec) == 6:
|
|
tipper, tippee, amount, good_cc, is_suspicious, claimed = rec
|
|
elif len(rec) == 7:
|
|
tipper, tippee, amount, good_cc, is_suspicious, claimed, platform \
|
|
= rec
|
|
elif len(rec) == 8:
|
|
tipper, tippee, amount, good_cc, is_suspicious, claimed, \
|
|
platform, user_id = rec
|
|
else:
|
|
raise Exception(rec)
|
|
|
|
assert good_cc in (True, False, None), good_cc
|
|
assert is_suspicious in (True, False, None), is_suspicious
|
|
_participants[tipper] = \
|
|
(good_cc, is_suspicious, True, platform, user_id)
|
|
|
|
if tippee is None:
|
|
continue
|
|
assert claimed in (True, False), claimed # refers to tippee
|
|
if tippee not in _participants:
|
|
_participants[tippee] = (None, False, claimed, "github", randid())
|
|
now = utcnow()
|
|
tips.append({ "ctime": now
|
|
, "mtime": now
|
|
, "tipper": tipper
|
|
, "tippee": tippee
|
|
, "amount": Decimal(amount)
|
|
})
|
|
|
|
then = utcnow() - datetime.timedelta(seconds=3600)
|
|
|
|
participants = []
|
|
elsewhere = []
|
|
for participant_id, crap in _participants.items():
|
|
(good_cc, is_suspicious, claimed, platform, user_id) = crap
|
|
username_key = "login" if platform == 'github' else "screen_name"
|
|
elsewhere.append({ "platform": platform
|
|
, "user_id": user_id
|
|
, "participant_id": participant_id
|
|
, "user_info": { "id": user_id
|
|
, username_key: participant_id
|
|
}
|
|
})
|
|
rec = {"id": participant_id}
|
|
if good_cc is not None:
|
|
rec["last_bill_result"] = "" if good_cc else "Failure!"
|
|
rec["balanced_account_uri"] = "/v1/blah/blah/" + participant_id
|
|
rec["is_suspicious"] = is_suspicious
|
|
if claimed:
|
|
rec["claimed_time"] = then
|
|
participants.append(rec)
|
|
|
|
return ["participants"] + participants \
|
|
+ ["tips"] + tips \
|
|
+ ["elsewhere"] + elsewhere
|
|
|
|
|
|
def tip_graph(*a, **kw):
|
|
context = load(*setup_tips(*a, **kw))
|
|
|
|
def resolve_elsewhere(participant_id):
|
|
recs = context.db.fetchall( "SELECT platform, user_id FROM elsewhere "
|
|
"WHERE participant_id=%s"
|
|
, (participant_id,)
|
|
)
|
|
if recs is not None:
|
|
recs = [(rec['platform'], rec['user_id']) for rec in recs]
|
|
return recs
|
|
|
|
context.resolve_elsewhere = resolve_elsewhere # Wheeee! :D
|
|
|
|
return context
|
|
|
|
|
|
def looks_random(s):
|
|
"""Sniff strings per gittip.participant.gen_random_participant_ids.
|
|
"""
|
|
return re.match(r'^[0-9a-f]{12}$', s)
|
|
|
|
|
|
# Helpers for testing simplates.
|
|
# ==============================
|
|
|
|
test_website = Website([ '--www_root', str(join(TOP, 'www'))
|
|
, '--project_root', str(TOP)
|
|
])
|
|
|
|
def serve_request(path, user=None):
|
|
"""Given an URL path, return response.
|
|
"""
|
|
request = StubRequest(path)
|
|
request.website = test_website
|
|
if user is not None:
|
|
user = User.from_id(user)
|
|
# Note that Cookie needs a bytestring.
|
|
request.headers.cookie[str('session')] = user.session_token
|
|
response = test_website.handle_safely(request)
|
|
return response
|
|
|
|
def load_simplate(path):
|
|
"""Given an URL path, return resource.
|
|
"""
|
|
request = StubRequest(path)
|
|
request.website = test_website
|
|
|
|
# XXX HACK - aspen.website should be refactored
|
|
from aspen import dispatcher, sockets
|
|
test_website.hooks.run('inbound_early', request)
|
|
dispatcher.dispatch(request) # sets request.fs
|
|
request.socket = sockets.get(request)
|
|
test_website.hooks.run('inbound_late', request)
|
|
|
|
return resources.get(request)
|
|
|
|
if __name__ == "__main__":
|
|
db = wireup.db()
|
|
populate_db_with_dummy_data(db)
|