diff --git a/HISTORY.rst b/HISTORY.rst index fc358002..ed90cd94 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,14 @@ History ------- +0.13.4 (2012-xx-xx) ++++++++++++++++++++ + +- Fix leaking connections (from urllib3 update) +- OAuthlib path hack fi +- App Engine 2.7 Fixes! + + 0.13.3 (2012-07-12) +++++++++++++++++++ diff --git a/requests/packages/urllib3/_collections.py b/requests/packages/urllib3/_collections.py index 3cef081e..a052b1da 100644 --- a/requests/packages/urllib3/_collections.py +++ b/requests/packages/urllib3/_collections.py @@ -4,128 +4,91 @@ # This module is part of urllib3 and is released under # the MIT License: http://www.opensource.org/licenses/mit-license.php -from collections import deque +from collections import MutableMapping +from threading import Lock + +try: # Python 2.7+ + from collections import OrderedDict +except ImportError: + from .packages.ordered_dict import OrderedDict -from threading import RLock __all__ = ['RecentlyUsedContainer'] -class AccessEntry(object): - __slots__ = ('key', 'is_valid') - - def __init__(self, key, is_valid=True): - self.key = key - self.is_valid = is_valid +_Null = object() -class RecentlyUsedContainer(dict): +class RecentlyUsedContainer(MutableMapping): """ - Provides a dict-like that maintains up to ``maxsize`` keys while throwing - away the least-recently-used keys beyond ``maxsize``. + Provides a thread-safe dict-like container which maintains up to + ``maxsize`` keys while throwing away the least-recently-used keys beyond + ``maxsize``. + + :param maxsize: + Maximum number of recent elements to retain. + + :param dispose_func: + Every time an item is evicted from the container, + ``dispose_func(value)`` is called. Callback which will get called """ - # If len(self.access_log) exceeds self._maxsize * CLEANUP_FACTOR, then we - # will attempt to cleanup the invalidated entries in the access_log - # datastructure during the next 'get' operation. - CLEANUP_FACTOR = 10 + ContainerCls = OrderedDict - def __init__(self, maxsize=10): + def __init__(self, maxsize=10, dispose_func=None): self._maxsize = maxsize + self.dispose_func = dispose_func - self._container = {} - - # We use a deque to to store our keys ordered by the last access. - self.access_log = deque() - self.access_log_lock = RLock() - - # We look up the access log entry by the key to invalidate it so we can - # insert a new authorative entry at the head without having to dig and - # find the old entry for removal immediately. - self.access_lookup = {} - - # Trigger a heap cleanup when we get past this size - self.access_log_limit = maxsize * self.CLEANUP_FACTOR - - def _invalidate_entry(self, key): - "If exists: Invalidate old entry and return it." - old_entry = self.access_lookup.get(key) - if old_entry: - old_entry.is_valid = False - - return old_entry - - def _push_entry(self, key): - "Push entry onto our access log, invalidate the old entry if exists." - self._invalidate_entry(key) - - new_entry = AccessEntry(key) - self.access_lookup[key] = new_entry - - self.access_log_lock.acquire() - self.access_log.appendleft(new_entry) - self.access_log_lock.release() - - def _prune_entries(self, num): - "Pop entries from our access log until we popped ``num`` valid ones." - while num > 0: - self.access_log_lock.acquire() - p = self.access_log.pop() - self.access_log_lock.release() - - if not p.is_valid: - continue # Invalidated entry, skip - - dict.pop(self, p.key, None) - self.access_lookup.pop(p.key, None) - num -= 1 - - def _prune_invalidated_entries(self): - "Rebuild our access_log without the invalidated entries." - self.access_log_lock.acquire() - self.access_log = deque(e for e in self.access_log if e.is_valid) - self.access_log_lock.release() - - def _get_ordered_access_keys(self): - "Return ordered access keys for inspection. Used for testing." - self.access_log_lock.acquire() - r = [e.key for e in self.access_log if e.is_valid] - self.access_log_lock.release() - - return r + self._container = self.ContainerCls() + self._lock = Lock() def __getitem__(self, key): - item = dict.get(self, key) + # Re-insert the item, moving it to the end of the eviction line. + with self._lock: + item = self._container.pop(key) + self._container[key] = item + return item - if not item: - raise KeyError(key) + def __setitem__(self, key, value): + evicted_value = _Null + with self._lock: + # Possibly evict the existing value of 'key' + evicted_value = self._container.get(key, _Null) + self._container[key] = value - # Insert new entry with new high priority, also implicitly invalidates - # the old entry. - self._push_entry(key) + # If we didn't evict an existing value, we might have to evict the + # least recently used item from the beginning of the container. + if len(self._container) > self._maxsize: + _key, evicted_value = self._container.popitem(last=False) - if len(self.access_log) > self.access_log_limit: - # Heap is getting too big, try to clean up any tailing invalidated - # entries. - self._prune_invalidated_entries() - - return item - - def __setitem__(self, key, item): - # Add item to our container and access log - dict.__setitem__(self, key, item) - self._push_entry(key) - - # Discard invalid and excess entries - self._prune_entries(len(self) - self._maxsize) + if self.dispose_func and evicted_value is not _Null: + self.dispose_func(evicted_value) def __delitem__(self, key): - self._invalidate_entry(key) - self.access_lookup.pop(key, None) - dict.__delitem__(self, key) + with self._lock: + value = self._container.pop(key) - def get(self, key, default=None): - try: - return self[key] - except KeyError: - return default + if self.dispose_func: + self.dispose_func(value) + + def __len__(self): + with self._lock: + return len(self._container) + + def __iter__(self): + raise NotImplementedError('Iteration over this class is unlikely to be threadsafe.') + + def clear(self): + with self._lock: + # Copy pointers to all values, then wipe the mapping + # under Python 2, this copies the list of values twice :-| + values = list(self._container.values()) + self._container.clear() + + if self.dispose_func: + for value in values: + self.dispose_func(value) + + def keys(self): + with self._lock: + return self._container.keys() diff --git a/requests/packages/urllib3/connectionpool.py b/requests/packages/urllib3/connectionpool.py index 071fd7e0..97da5446 100644 --- a/requests/packages/urllib3/connectionpool.py +++ b/requests/packages/urllib3/connectionpool.py @@ -7,27 +7,27 @@ import logging import socket -from socket import error as SocketError, timeout as SocketTimeout +from socket import timeout as SocketTimeout -try: # Python 3 +try: # Python 3 from http.client import HTTPConnection, HTTPException from http.client import HTTP_PORT, HTTPS_PORT except ImportError: from httplib import HTTPConnection, HTTPException from httplib import HTTP_PORT, HTTPS_PORT -try: # Python 3 +try: # Python 3 from queue import LifoQueue, Empty, Full except ImportError: from Queue import LifoQueue, Empty, Full -try: # Compiled with SSL? +try: # Compiled with SSL? HTTPSConnection = object BaseSSLError = None ssl = None - try: # Python 3 + try: # Python 3 from http.client import HTTPSConnection except ImportError: from httplib import HTTPSConnection @@ -35,7 +35,7 @@ try: # Compiled with SSL? import ssl BaseSSLError = ssl.SSLError -except (ImportError, AttributeError): +except (ImportError, AttributeError): # Platform-specific: No SSL. pass @@ -43,6 +43,7 @@ from .request import RequestMethods from .response import HTTPResponse from .util import get_host, is_connection_dropped from .exceptions import ( + ClosedPoolError, EmptyPoolError, HostChangedError, MaxRetryError, @@ -206,10 +207,8 @@ class HTTPConnectionPool(ConnectionPool, RequestMethods): try: conn = self.pool.get(block=self.block, timeout=timeout) - # If this is a persistent connection, check if it got disconnected - if conn and is_connection_dropped(conn): - log.info("Resetting dropped connection: %s" % self.host) - conn.close() + except AttributeError: # self.pool is None + raise ClosedPoolError(self, "Pool is closed.") except Empty: if self.block: @@ -218,6 +217,11 @@ class HTTPConnectionPool(ConnectionPool, RequestMethods): "connections are allowed.") pass # Oh well, we'll create a new connection then + # If this is a persistent connection, check if it got disconnected + if conn and is_connection_dropped(conn): + log.info("Resetting dropped connection: %s" % self.host) + conn.close() + return conn or self._new_conn() def _put_conn(self, conn): @@ -228,17 +232,26 @@ class HTTPConnectionPool(ConnectionPool, RequestMethods): Connection object for the current host and port as returned by :meth:`._new_conn` or :meth:`._get_conn`. - If the pool is already full, the connection is discarded because we - exceeded maxsize. If connections are discarded frequently, then maxsize - should be increased. + If the pool is already full, the connection is closed and discarded + because we exceeded maxsize. If connections are discarded frequently, + then maxsize should be increased. + + If the pool is closed, then the connection will be closed and discarded. """ try: self.pool.put(conn, block=False) + return # Everything is dandy, done. + except AttributeError: + # self.pool is None. + pass except Full: # This should never happen if self.block == True log.warning("HttpConnectionPool is full, discarding connection: %s" % self.host) + # Connection never got put back into the pool, close it. + conn.close() + def _make_request(self, conn, method, url, timeout=_Default, **httplib_request_kw): """ @@ -268,15 +281,32 @@ class HTTPConnectionPool(ConnectionPool, RequestMethods): log.debug("\"%s %s %s\" %s %s" % (method, url, http_version, httplib_response.status, httplib_response.length)) - return httplib_response + def close(self): + """ + Close all pooled connections and disable the pool. + """ + # Disable access to the pool + old_pool, self.pool = self.pool, None + + try: + while True: + conn = old_pool.get(block=False) + if conn: + conn.close() + + except Empty: + pass # Done. def is_same_host(self, url): """ Check if the given ``url`` is a member of the same host as this connection pool. """ + if url.startswith('/'): + return True + # TODO: Add optional support for socket.gethostbyname checking. scheme, host, port = get_host(url) @@ -284,8 +314,7 @@ class HTTPConnectionPool(ConnectionPool, RequestMethods): # Use explicit default port for comparison when none is given. port = port_by_scheme.get(scheme) - return (url.startswith('/') or - (scheme, host, port) == (self.scheme, self.host, self.port)) + return (scheme, host, port) == (self.scheme, self.host, self.port) def urlopen(self, method, url, body=None, headers=None, retries=3, redirect=True, assert_same_host=True, timeout=_Default, @@ -378,7 +407,6 @@ class HTTPConnectionPool(ConnectionPool, RequestMethods): try: # Request a connection from the queue - # (Could raise SocketError: Bad file descriptor) conn = self._get_conn(timeout=pool_timeout) # Make the request on the httplib connection object @@ -421,29 +449,38 @@ class HTTPConnectionPool(ConnectionPool, RequestMethods): # Name mismatch raise SSLError(e) - except (HTTPException, SocketError) as e: + except HTTPException as e: # Connection broken, discard. It will be replaced next _get_conn(). conn = None # This is necessary so we can access e below err = e finally: - if conn and release_conn: - # Put the connection back to be reused + if release_conn: + # Put the connection back to be reused. If the connection is + # expired then it will be None, which will get replaced with a + # fresh connection during _get_conn. self._put_conn(conn) if not conn: + # Try again log.warn("Retrying (%d attempts remain) after connection " "broken by '%r': %s" % (retries, err, url)) return self.urlopen(method, url, body, headers, retries - 1, - redirect, assert_same_host) # Try again + redirect, assert_same_host, + timeout=timeout, pool_timeout=pool_timeout, + release_conn=release_conn, **response_kw) # Handle redirect? redirect_location = redirect and response.get_redirect_location() if redirect_location: + if response.status == 303: + method = 'GET' log.info("Redirecting %s -> %s" % (url, redirect_location)) return self.urlopen(method, redirect_location, body, headers, - retries - 1, redirect, assert_same_host) + retries - 1, redirect, assert_same_host, + timeout=timeout, pool_timeout=pool_timeout, + release_conn=release_conn, **response_kw) return response diff --git a/requests/packages/urllib3/exceptions.py b/requests/packages/urllib3/exceptions.py index 15c9699e..99ebb67e 100644 --- a/requests/packages/urllib3/exceptions.py +++ b/requests/packages/urllib3/exceptions.py @@ -24,6 +24,11 @@ class SSLError(HTTPError): pass +class DecodeError(HTTPError): + "Raised when automatic decoding based on Content-Type fails." + pass + + ## Leaf Exceptions class MaxRetryError(PoolError): @@ -57,6 +62,11 @@ class EmptyPoolError(PoolError): pass +class ClosedPoolError(PoolError): + "Raised when a request enters a pool after the pool has been closed." + pass + + class LocationParseError(ValueError, HTTPError): "Raised when get_host or similar fails to parse the URL input." diff --git a/requests/packages/urllib3/poolmanager.py b/requests/packages/urllib3/poolmanager.py index 310ea21d..8f5b54c1 100644 --- a/requests/packages/urllib3/poolmanager.py +++ b/requests/packages/urllib3/poolmanager.py @@ -8,9 +8,9 @@ import logging from ._collections import RecentlyUsedContainer from .connectionpool import HTTPConnectionPool, HTTPSConnectionPool -from .connectionpool import get_host, connection_from_url, port_by_scheme -from .exceptions import HostChangedError +from .connectionpool import connection_from_url, port_by_scheme from .request import RequestMethods +from .util import parse_url __all__ = ['PoolManager', 'ProxyManager', 'proxy_from_url'] @@ -48,19 +48,29 @@ class PoolManager(RequestMethods): """ - # TODO: Make sure there are no memory leaks here. - def __init__(self, num_pools=10, **connection_pool_kw): self.connection_pool_kw = connection_pool_kw - self.pools = RecentlyUsedContainer(num_pools) + self.pools = RecentlyUsedContainer(num_pools, + dispose_func=lambda p: p.close()) - def connection_from_host(self, host, port=80, scheme='http'): + def clear(self): + """ + Empty our store of pools and direct them all to close. + + This will not affect in-flight connections, but they will not be + re-used after completion. + """ + self.pools.clear() + + def connection_from_host(self, host, port=None, scheme='http'): """ Get a :class:`ConnectionPool` based on the host, port, and scheme. - Note that an appropriate ``port`` value is required here to normalize - connection pools in our container most effectively. + If ``port`` isn't given, it will be derived from the ``scheme`` using + ``urllib3.connectionpool.port_by_scheme``. """ + port = port or port_by_scheme.get(scheme, 80) + pool_key = (scheme, host, port) # If the scheme, host, or port doesn't match existing open connections, @@ -86,26 +96,36 @@ class PoolManager(RequestMethods): Additional parameters are taken from the :class:`.PoolManager` constructor. """ - scheme, host, port = get_host(url) + u = parse_url(url) + return self.connection_from_host(u.host, port=u.port, scheme=u.scheme) - port = port or port_by_scheme.get(scheme, 80) - - return self.connection_from_host(host, port=port, scheme=scheme) - - def urlopen(self, method, url, **kw): + def urlopen(self, method, url, redirect=True, **kw): """ - Same as :meth:`urllib3.connectionpool.HTTPConnectionPool.urlopen`. + Same as :meth:`urllib3.connectionpool.HTTPConnectionPool.urlopen` + with custom cross-host redirect logic and only sends the request-uri + portion of the ``url``. - ``url`` must be absolute, such that an appropriate + The given ``url`` parameter must be absolute, such that an appropriate :class:`urllib3.connectionpool.ConnectionPool` can be chosen for it. """ - conn = self.connection_from_url(url) - try: - return conn.urlopen(method, url, **kw) + u = parse_url(url) + conn = self.connection_from_host(u.host, port=u.port, scheme=u.scheme) - except HostChangedError as e: - kw['retries'] = e.retries # Persist retries countdown - return self.urlopen(method, e.url, **kw) + kw['assert_same_host'] = False + kw['redirect'] = False + + response = conn.urlopen(method, u.request_uri, **kw) + + redirect_location = redirect and response.get_redirect_location() + if not redirect_location: + return response + + if response.status == 303: + method = 'GET' + + log.info("Redirecting %s -> %s" % (url, redirect_location)) + kw['retries'] = kw.get('retries', 3) - 1 # Persist retries countdown + return self.urlopen(method, redirect_location, **kw) class ProxyManager(RequestMethods): diff --git a/requests/packages/urllib3/response.py b/requests/packages/urllib3/response.py index 5fab8243..28537d3b 100644 --- a/requests/packages/urllib3/response.py +++ b/requests/packages/urllib3/response.py @@ -10,7 +10,7 @@ import zlib from io import BytesIO -from .exceptions import HTTPError +from .exceptions import DecodeError from .packages.six import string_types as basestring @@ -148,9 +148,9 @@ class HTTPResponse(object): try: if decode_content and decoder: data = decoder(data) - except IOError: - raise HTTPError("Received response with content-encoding: %s, but " - "failed to decode it." % content_encoding) + except (IOError, zlib.error): + raise DecodeError("Received response with content-encoding: %s, but " + "failed to decode it." % content_encoding) if cache_content: self._body = data diff --git a/requests/packages/urllib3/util.py b/requests/packages/urllib3/util.py index 9669ce97..8ec990bc 100644 --- a/requests/packages/urllib3/util.py +++ b/requests/packages/urllib3/util.py @@ -6,6 +6,8 @@ from base64 import b64encode +from collections import namedtuple +from socket import error as SocketError try: from select import poll, POLLIN @@ -20,6 +22,152 @@ from .packages import six from .exceptions import LocationParseError +class Url(namedtuple('Url', ['scheme', 'auth', 'host', 'port', 'path', 'query', 'fragment'])): + """ + Datastructure for representing an HTTP URL. Used as a return value for + :func:`parse_url`. + """ + slots = () + + def __new__(cls, scheme=None, auth=None, host=None, port=None, path=None, query=None, fragment=None): + return super(Url, cls).__new__(cls, scheme, auth, host, port, path, query, fragment) + + @property + def hostname(self): + """For backwards-compatibility with urlparse. We're nice like that.""" + return self.host + + @property + def request_uri(self): + """Absolute path including the query string.""" + uri = self.path or '/' + + if self.query is not None: + uri += '?' + self.query + + return uri + + +def split_first(s, delims): + """ + Given a string and an iterable of delimiters, split on the first found + delimiter. Return two split parts and the matched delimiter. + + If not found, then the first part is the full input string. + + Example: :: + + >>> split_first('foo/bar?baz', '?/=') + ('foo', 'bar?baz', '/') + >>> split_first('foo/bar?baz', '123') + ('foo/bar?baz', '', None) + + Scales linearly with number of delims. Not ideal for large number of delims. + """ + min_idx = None + min_delim = None + for d in delims: + idx = s.find(d) + if idx < 0: + continue + + if min_idx is None or idx < min_idx: + min_idx = idx + min_delim = d + + if min_idx is None or min_idx < 0: + return s, '', None + + return s[:min_idx], s[min_idx+1:], min_delim + + +def parse_url(url): + """ + Given a url, return a parsed :class:`.Url` namedtuple. Best-effort is + performed to parse incomplete urls. Fields not provided will be None. + + Partly backwards-compatible with :mod:`urlparse`. + + Example: :: + + >>> parse_url('http://google.com/mail/') + Url(scheme='http', host='google.com', port=None, path='/', ...) + >>> prase_url('google.com:80') + Url(scheme=None, host='google.com', port=80, path=None, ...) + >>> prase_url('/foo?bar') + Url(scheme=None, host=None, port=None, path='/foo', query='bar', ...) + """ + + # While this code has overlap with stdlib's urlparse, it is much + # simplified for our needs and less annoying. + # Additionally, this imeplementations does silly things to be optimal + # on CPython. + + scheme = None + auth = None + host = None + port = None + path = None + fragment = None + query = None + + # Scheme + if '://' in url: + scheme, url = url.split('://', 1) + + # Find the earliest Authority Terminator + # (http://tools.ietf.org/html/rfc3986#section-3.2) + url, path_, delim = split_first(url, ['/', '?', '#']) + + if delim: + # Reassemble the path + path = delim + path_ + + # Auth + if '@' in url: + auth, url = url.split('@', 1) + + # IPv6 + if url and url[0] == '[': + host, url = url[1:].split(']', 1) + + # Port + if ':' in url: + _host, port = url.split(':', 1) + + if not host: + host = _host + + if not port.isdigit(): + raise LocationParseError("Failed to parse: %s" % url) + + port = int(port) + + elif not host and url: + host = url + + if not path: + return Url(scheme, auth, host, port, path, query, fragment) + + # Fragment + if '#' in path: + path, fragment = path.split('#', 1) + + # Query + if '?' in path: + path, query = path.split('?', 1) + + return Url(scheme, auth, host, port, path, query, fragment) + + +def get_host(url): + """ + Deprecated. Use :func:`.parse_url` instead. + """ + p = parse_url(url) + return p.scheme or 'http', p.hostname, p.port + + def make_headers(keep_alive=None, accept_encoding=None, user_agent=None, basic_auth=None): """ @@ -72,93 +220,12 @@ def make_headers(keep_alive=None, accept_encoding=None, user_agent=None, return headers -def split_first(s, delims): - """ - Given a string and an iterable of delimiters, split on the first found - delimiter. Return two split parts. - - If not found, then the first part is the full input string. - - Scales linearly with number of delims. Not ideal for large number of delims. - """ - min_idx = None - for d in delims: - idx = s.find(d) - if idx < 0: - continue - - if not min_idx: - min_idx = idx - else: - min_idx = min(idx, min_idx) - - if min_idx < 0: - return s, '' - - return s[:min_idx], s[min_idx+1:] - - -def get_host(url): - """ - Given a url, return its scheme, host and port (None if it's not there). - - For example: :: - - >>> get_host('http://google.com/mail/') - ('http', 'google.com', None) - >>> get_host('google.com:80') - ('http', 'google.com', 80) - """ - - # While this code has overlap with stdlib's urlparse, it is much - # simplified for our needs and less annoying. - # Additionally, this imeplementations does silly things to be optimal - # on CPython. - - scheme = 'http' - host = None - port = None - - # Scheme - if '://' in url: - scheme, url = url.split('://', 1) - - # Find the earliest Authority Terminator - # (http://tools.ietf.org/html/rfc3986#section-3.2) - url, _path = split_first(url, ['/', '?', '#']) - - # Auth - if '@' in url: - _auth, url = url.split('@', 1) - - # IPv6 - if url and url[0] == '[': - host, url = url[1:].split(']', 1) - - # Port - if ':' in url: - _host, port = url.split(':', 1) - - if not host: - host = _host - - if not port.isdigit(): - raise LocationParseError("Failed to parse: %s" % url) - - port = int(port) - - elif not host: - host = url - - return scheme, host, port - - def is_connection_dropped(conn): """ Returns True if the connection is dropped and should be closed. :param conn: - ``HTTPConnection`` object. + :class:`httplib.HTTPConnection` object. Note: For platforms like AppEngine, this will always return ``False`` to let the platform handle connection recycling transparently for us. @@ -171,7 +238,10 @@ def is_connection_dropped(conn): if not select: # Platform-specific: AppEngine return False - return select([sock], [], [], 0.0)[0] + try: + return select([sock], [], [], 0.0)[0] + except SocketError: + return True # This version is better on platforms that support it. p = poll()