From 6ae41209dd161c605bce126561295b22ebdf8e2f Mon Sep 17 00:00:00 2001 From: Filip Wasilewski Date: Wed, 24 Oct 2012 23:32:00 +0200 Subject: [PATCH] Invalidate broken connections --- django_postgrespool/base.py | 65 ++++++++++++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/django_postgrespool/base.py b/django_postgrespool/base.py index 989f711..aa4628b 100644 --- a/django_postgrespool/base.py +++ b/django_postgrespool/base.py @@ -5,10 +5,12 @@ from functools import partial from sqlalchemy import event from sqlalchemy.pool import manage, QueuePool +from psycopg2 import InterfaceError, ProgrammingError, OperationalError from django.conf import settings from django.db.backends.postgresql_psycopg2.base import * from django.db.backends.postgresql_psycopg2.base import DatabaseWrapper as Psycopg2DatabaseWrapper +from django.db.backends.postgresql_psycopg2.base import CursorWrapper as DjangoCursorWrapper POOL_SETTINGS = 'DATABASE_POOL_ARGS' @@ -29,6 +31,67 @@ if settings.DEBUG: event.listen(QueuePool, 'connect', partial(_log, 'new connection')) +def is_disconnect(e, connection, cursor): + """ + Connection state check from SQLAlchemy: + https://bitbucket.org/sqlalchemy/sqlalchemy/src/tip/lib/sqlalchemy/dialects/postgresql/psycopg2.py + """ + if isinstance(e, OperationalError): + # these error messages from libpq: interfaces/libpq/fe-misc.c. + # TODO: these are sent through gettext in libpq and we can't + # check within other locales - consider using connection.closed + return 'terminating connection' in str(e) or \ + 'closed the connection' in str(e) or \ + 'connection not open' in str(e) or \ + 'could not receive data from server' in str(e) + elif isinstance(e, InterfaceError): + # psycopg2 client errors, psycopg2/conenction.h, psycopg2/cursor.h + return 'connection already closed' in str(e) or \ + 'cursor already closed' in str(e) + elif isinstance(e, ProgrammingError): + # not sure where this path is originally from, it may + # be obsolete. It really says "losed", not "closed". + return "losed the connection unexpectedly" in str(e) + else: + return False + + +class CursorWrapper(DjangoCursorWrapper): + """ + A thin wrapper around psycopg2's normal cursor class so that we can catch + particular exception instances and reraise them with the right types. + + Checks for connection state on DB API error and invalidates + broken connections. + """ + + def __init__(self, cursor, connection): + self.cursor = cursor + self.connection = connection + + def execute(self, query, args=None): + try: + return self.cursor.execute(query, args) + except Database.IntegrityError, e: + raise utils.IntegrityError, utils.IntegrityError(*tuple(e)), sys.exc_info()[2] + except Database.DatabaseError, e: + if is_disconnect(e, self.connection.connection, self.cursor): + log.error("invalidating broken connection") + self.connection.invalidate() + raise utils.DatabaseError, utils.DatabaseError(*tuple(e)), sys.exc_info()[2] + + def executemany(self, query, args): + try: + return self.cursor.executemany(query, args) + except Database.IntegrityError, e: + raise utils.IntegrityError, utils.IntegrityError(*tuple(e)), sys.exc_info()[2] + except Database.DatabaseError, e: + if is_disconnect(e, self.connection.connection, self.cursor): + log.error("invalidating broken connection") + self.connection.invalidate() + raise utils.DatabaseError, utils.DatabaseError(*tuple(e)), sys.exc_info()[2] + + class DatabaseWrapper(Psycopg2DatabaseWrapper): """SQLAlchemy FTW.""" @@ -80,4 +143,4 @@ class DatabaseWrapper(Psycopg2DatabaseWrapper): connection_created.send(sender=self.__class__, connection=self) cursor = self.connection.cursor() cursor.tzinfo_factory = utc_tzinfo_factory if settings.USE_TZ else None - return CursorWrapper(cursor) + return CursorWrapper(cursor, self.connection)