Merge pull request #4 from nigma/master

Invalidate broken connections
This commit is contained in:
Kenneth Reitz
2012-10-27 08:42:51 -07:00
+64 -1
View File
@@ -5,10 +5,12 @@ from functools import partial
from sqlalchemy import event
from sqlalchemy.pool import manage, QueuePool
from psycopg2 import InterfaceError, ProgrammingError, OperationalError
from django.conf import settings
from django.db.backends.postgresql_psycopg2.base import *
from django.db.backends.postgresql_psycopg2.base import DatabaseWrapper as Psycopg2DatabaseWrapper
from django.db.backends.postgresql_psycopg2.base import CursorWrapper as DjangoCursorWrapper
POOL_SETTINGS = 'DATABASE_POOL_ARGS'
@@ -29,6 +31,67 @@ if settings.DEBUG:
event.listen(QueuePool, 'connect', partial(_log, 'new connection'))
def is_disconnect(e, connection, cursor):
"""
Connection state check from SQLAlchemy:
https://bitbucket.org/sqlalchemy/sqlalchemy/src/tip/lib/sqlalchemy/dialects/postgresql/psycopg2.py
"""
if isinstance(e, OperationalError):
# these error messages from libpq: interfaces/libpq/fe-misc.c.
# TODO: these are sent through gettext in libpq and we can't
# check within other locales - consider using connection.closed
return 'terminating connection' in str(e) or \
'closed the connection' in str(e) or \
'connection not open' in str(e) or \
'could not receive data from server' in str(e)
elif isinstance(e, InterfaceError):
# psycopg2 client errors, psycopg2/conenction.h, psycopg2/cursor.h
return 'connection already closed' in str(e) or \
'cursor already closed' in str(e)
elif isinstance(e, ProgrammingError):
# not sure where this path is originally from, it may
# be obsolete. It really says "losed", not "closed".
return "losed the connection unexpectedly" in str(e)
else:
return False
class CursorWrapper(DjangoCursorWrapper):
"""
A thin wrapper around psycopg2's normal cursor class so that we can catch
particular exception instances and reraise them with the right types.
Checks for connection state on DB API error and invalidates
broken connections.
"""
def __init__(self, cursor, connection):
self.cursor = cursor
self.connection = connection
def execute(self, query, args=None):
try:
return self.cursor.execute(query, args)
except Database.IntegrityError, e:
raise utils.IntegrityError, utils.IntegrityError(*tuple(e)), sys.exc_info()[2]
except Database.DatabaseError, e:
if is_disconnect(e, self.connection.connection, self.cursor):
log.error("invalidating broken connection")
self.connection.invalidate()
raise utils.DatabaseError, utils.DatabaseError(*tuple(e)), sys.exc_info()[2]
def executemany(self, query, args):
try:
return self.cursor.executemany(query, args)
except Database.IntegrityError, e:
raise utils.IntegrityError, utils.IntegrityError(*tuple(e)), sys.exc_info()[2]
except Database.DatabaseError, e:
if is_disconnect(e, self.connection.connection, self.cursor):
log.error("invalidating broken connection")
self.connection.invalidate()
raise utils.DatabaseError, utils.DatabaseError(*tuple(e)), sys.exc_info()[2]
class DatabaseWrapper(Psycopg2DatabaseWrapper):
"""SQLAlchemy FTW."""
@@ -80,4 +143,4 @@ class DatabaseWrapper(Psycopg2DatabaseWrapper):
connection_created.send(sender=self.__class__, connection=self)
cursor = self.connection.cursor()
cursor.tzinfo_factory = utc_tzinfo_factory if settings.USE_TZ else None
return CursorWrapper(cursor)
return CursorWrapper(cursor, self.connection)