urllib update and junks

This commit is contained in:
Kenneth Reitz
2012-07-27 01:32:01 -04:00
parent 0d9ab27b02
commit e02fb2eb6c
7 changed files with 343 additions and 235 deletions
+8
View File
@@ -3,6 +3,14 @@
History
-------
0.13.4 (2012-xx-xx)
+++++++++++++++++++
- Fix leaking connections (from urllib3 update)
- OAuthlib path hack fi
- App Engine 2.7 Fixes!
0.13.3 (2012-07-12)
+++++++++++++++++++
+67 -104
View File
@@ -4,128 +4,91 @@
# This module is part of urllib3 and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
from collections import deque
from collections import MutableMapping
from threading import Lock
try: # Python 2.7+
from collections import OrderedDict
except ImportError:
from .packages.ordered_dict import OrderedDict
from threading import RLock
__all__ = ['RecentlyUsedContainer']
class AccessEntry(object):
__slots__ = ('key', 'is_valid')
def __init__(self, key, is_valid=True):
self.key = key
self.is_valid = is_valid
_Null = object()
class RecentlyUsedContainer(dict):
class RecentlyUsedContainer(MutableMapping):
"""
Provides a dict-like that maintains up to ``maxsize`` keys while throwing
away the least-recently-used keys beyond ``maxsize``.
Provides a thread-safe dict-like container which maintains up to
``maxsize`` keys while throwing away the least-recently-used keys beyond
``maxsize``.
:param maxsize:
Maximum number of recent elements to retain.
:param dispose_func:
Every time an item is evicted from the container,
``dispose_func(value)`` is called. Callback which will get called
"""
# If len(self.access_log) exceeds self._maxsize * CLEANUP_FACTOR, then we
# will attempt to cleanup the invalidated entries in the access_log
# datastructure during the next 'get' operation.
CLEANUP_FACTOR = 10
ContainerCls = OrderedDict
def __init__(self, maxsize=10):
def __init__(self, maxsize=10, dispose_func=None):
self._maxsize = maxsize
self.dispose_func = dispose_func
self._container = {}
# We use a deque to to store our keys ordered by the last access.
self.access_log = deque()
self.access_log_lock = RLock()
# We look up the access log entry by the key to invalidate it so we can
# insert a new authorative entry at the head without having to dig and
# find the old entry for removal immediately.
self.access_lookup = {}
# Trigger a heap cleanup when we get past this size
self.access_log_limit = maxsize * self.CLEANUP_FACTOR
def _invalidate_entry(self, key):
"If exists: Invalidate old entry and return it."
old_entry = self.access_lookup.get(key)
if old_entry:
old_entry.is_valid = False
return old_entry
def _push_entry(self, key):
"Push entry onto our access log, invalidate the old entry if exists."
self._invalidate_entry(key)
new_entry = AccessEntry(key)
self.access_lookup[key] = new_entry
self.access_log_lock.acquire()
self.access_log.appendleft(new_entry)
self.access_log_lock.release()
def _prune_entries(self, num):
"Pop entries from our access log until we popped ``num`` valid ones."
while num > 0:
self.access_log_lock.acquire()
p = self.access_log.pop()
self.access_log_lock.release()
if not p.is_valid:
continue # Invalidated entry, skip
dict.pop(self, p.key, None)
self.access_lookup.pop(p.key, None)
num -= 1
def _prune_invalidated_entries(self):
"Rebuild our access_log without the invalidated entries."
self.access_log_lock.acquire()
self.access_log = deque(e for e in self.access_log if e.is_valid)
self.access_log_lock.release()
def _get_ordered_access_keys(self):
"Return ordered access keys for inspection. Used for testing."
self.access_log_lock.acquire()
r = [e.key for e in self.access_log if e.is_valid]
self.access_log_lock.release()
return r
self._container = self.ContainerCls()
self._lock = Lock()
def __getitem__(self, key):
item = dict.get(self, key)
# Re-insert the item, moving it to the end of the eviction line.
with self._lock:
item = self._container.pop(key)
self._container[key] = item
return item
if not item:
raise KeyError(key)
def __setitem__(self, key, value):
evicted_value = _Null
with self._lock:
# Possibly evict the existing value of 'key'
evicted_value = self._container.get(key, _Null)
self._container[key] = value
# Insert new entry with new high priority, also implicitly invalidates
# the old entry.
self._push_entry(key)
# If we didn't evict an existing value, we might have to evict the
# least recently used item from the beginning of the container.
if len(self._container) > self._maxsize:
_key, evicted_value = self._container.popitem(last=False)
if len(self.access_log) > self.access_log_limit:
# Heap is getting too big, try to clean up any tailing invalidated
# entries.
self._prune_invalidated_entries()
return item
def __setitem__(self, key, item):
# Add item to our container and access log
dict.__setitem__(self, key, item)
self._push_entry(key)
# Discard invalid and excess entries
self._prune_entries(len(self) - self._maxsize)
if self.dispose_func and evicted_value is not _Null:
self.dispose_func(evicted_value)
def __delitem__(self, key):
self._invalidate_entry(key)
self.access_lookup.pop(key, None)
dict.__delitem__(self, key)
with self._lock:
value = self._container.pop(key)
def get(self, key, default=None):
try:
return self[key]
except KeyError:
return default
if self.dispose_func:
self.dispose_func(value)
def __len__(self):
with self._lock:
return len(self._container)
def __iter__(self):
raise NotImplementedError('Iteration over this class is unlikely to be threadsafe.')
def clear(self):
with self._lock:
# Copy pointers to all values, then wipe the mapping
# under Python 2, this copies the list of values twice :-|
values = list(self._container.values())
self._container.clear()
if self.dispose_func:
for value in values:
self.dispose_func(value)
def keys(self):
with self._lock:
return self._container.keys()
+59 -22
View File
@@ -7,27 +7,27 @@
import logging
import socket
from socket import error as SocketError, timeout as SocketTimeout
from socket import timeout as SocketTimeout
try: # Python 3
try: # Python 3
from http.client import HTTPConnection, HTTPException
from http.client import HTTP_PORT, HTTPS_PORT
except ImportError:
from httplib import HTTPConnection, HTTPException
from httplib import HTTP_PORT, HTTPS_PORT
try: # Python 3
try: # Python 3
from queue import LifoQueue, Empty, Full
except ImportError:
from Queue import LifoQueue, Empty, Full
try: # Compiled with SSL?
try: # Compiled with SSL?
HTTPSConnection = object
BaseSSLError = None
ssl = None
try: # Python 3
try: # Python 3
from http.client import HTTPSConnection
except ImportError:
from httplib import HTTPSConnection
@@ -35,7 +35,7 @@ try: # Compiled with SSL?
import ssl
BaseSSLError = ssl.SSLError
except (ImportError, AttributeError):
except (ImportError, AttributeError): # Platform-specific: No SSL.
pass
@@ -43,6 +43,7 @@ from .request import RequestMethods
from .response import HTTPResponse
from .util import get_host, is_connection_dropped
from .exceptions import (
ClosedPoolError,
EmptyPoolError,
HostChangedError,
MaxRetryError,
@@ -206,10 +207,8 @@ class HTTPConnectionPool(ConnectionPool, RequestMethods):
try:
conn = self.pool.get(block=self.block, timeout=timeout)
# If this is a persistent connection, check if it got disconnected
if conn and is_connection_dropped(conn):
log.info("Resetting dropped connection: %s" % self.host)
conn.close()
except AttributeError: # self.pool is None
raise ClosedPoolError(self, "Pool is closed.")
except Empty:
if self.block:
@@ -218,6 +217,11 @@ class HTTPConnectionPool(ConnectionPool, RequestMethods):
"connections are allowed.")
pass # Oh well, we'll create a new connection then
# If this is a persistent connection, check if it got disconnected
if conn and is_connection_dropped(conn):
log.info("Resetting dropped connection: %s" % self.host)
conn.close()
return conn or self._new_conn()
def _put_conn(self, conn):
@@ -228,17 +232,26 @@ class HTTPConnectionPool(ConnectionPool, RequestMethods):
Connection object for the current host and port as returned by
:meth:`._new_conn` or :meth:`._get_conn`.
If the pool is already full, the connection is discarded because we
exceeded maxsize. If connections are discarded frequently, then maxsize
should be increased.
If the pool is already full, the connection is closed and discarded
because we exceeded maxsize. If connections are discarded frequently,
then maxsize should be increased.
If the pool is closed, then the connection will be closed and discarded.
"""
try:
self.pool.put(conn, block=False)
return # Everything is dandy, done.
except AttributeError:
# self.pool is None.
pass
except Full:
# This should never happen if self.block == True
log.warning("HttpConnectionPool is full, discarding connection: %s"
% self.host)
# Connection never got put back into the pool, close it.
conn.close()
def _make_request(self, conn, method, url, timeout=_Default,
**httplib_request_kw):
"""
@@ -268,15 +281,32 @@ class HTTPConnectionPool(ConnectionPool, RequestMethods):
log.debug("\"%s %s %s\" %s %s" % (method, url, http_version,
httplib_response.status,
httplib_response.length))
return httplib_response
def close(self):
"""
Close all pooled connections and disable the pool.
"""
# Disable access to the pool
old_pool, self.pool = self.pool, None
try:
while True:
conn = old_pool.get(block=False)
if conn:
conn.close()
except Empty:
pass # Done.
def is_same_host(self, url):
"""
Check if the given ``url`` is a member of the same host as this
connection pool.
"""
if url.startswith('/'):
return True
# TODO: Add optional support for socket.gethostbyname checking.
scheme, host, port = get_host(url)
@@ -284,8 +314,7 @@ class HTTPConnectionPool(ConnectionPool, RequestMethods):
# Use explicit default port for comparison when none is given.
port = port_by_scheme.get(scheme)
return (url.startswith('/') or
(scheme, host, port) == (self.scheme, self.host, self.port))
return (scheme, host, port) == (self.scheme, self.host, self.port)
def urlopen(self, method, url, body=None, headers=None, retries=3,
redirect=True, assert_same_host=True, timeout=_Default,
@@ -378,7 +407,6 @@ class HTTPConnectionPool(ConnectionPool, RequestMethods):
try:
# Request a connection from the queue
# (Could raise SocketError: Bad file descriptor)
conn = self._get_conn(timeout=pool_timeout)
# Make the request on the httplib connection object
@@ -421,29 +449,38 @@ class HTTPConnectionPool(ConnectionPool, RequestMethods):
# Name mismatch
raise SSLError(e)
except (HTTPException, SocketError) as e:
except HTTPException as e:
# Connection broken, discard. It will be replaced next _get_conn().
conn = None
# This is necessary so we can access e below
err = e
finally:
if conn and release_conn:
# Put the connection back to be reused
if release_conn:
# Put the connection back to be reused. If the connection is
# expired then it will be None, which will get replaced with a
# fresh connection during _get_conn.
self._put_conn(conn)
if not conn:
# Try again
log.warn("Retrying (%d attempts remain) after connection "
"broken by '%r': %s" % (retries, err, url))
return self.urlopen(method, url, body, headers, retries - 1,
redirect, assert_same_host) # Try again
redirect, assert_same_host,
timeout=timeout, pool_timeout=pool_timeout,
release_conn=release_conn, **response_kw)
# Handle redirect?
redirect_location = redirect and response.get_redirect_location()
if redirect_location:
if response.status == 303:
method = 'GET'
log.info("Redirecting %s -> %s" % (url, redirect_location))
return self.urlopen(method, redirect_location, body, headers,
retries - 1, redirect, assert_same_host)
retries - 1, redirect, assert_same_host,
timeout=timeout, pool_timeout=pool_timeout,
release_conn=release_conn, **response_kw)
return response
+10
View File
@@ -24,6 +24,11 @@ class SSLError(HTTPError):
pass
class DecodeError(HTTPError):
"Raised when automatic decoding based on Content-Type fails."
pass
## Leaf Exceptions
class MaxRetryError(PoolError):
@@ -57,6 +62,11 @@ class EmptyPoolError(PoolError):
pass
class ClosedPoolError(PoolError):
"Raised when a request enters a pool after the pool has been closed."
pass
class LocationParseError(ValueError, HTTPError):
"Raised when get_host or similar fails to parse the URL input."
+42 -22
View File
@@ -8,9 +8,9 @@ import logging
from ._collections import RecentlyUsedContainer
from .connectionpool import HTTPConnectionPool, HTTPSConnectionPool
from .connectionpool import get_host, connection_from_url, port_by_scheme
from .exceptions import HostChangedError
from .connectionpool import connection_from_url, port_by_scheme
from .request import RequestMethods
from .util import parse_url
__all__ = ['PoolManager', 'ProxyManager', 'proxy_from_url']
@@ -48,19 +48,29 @@ class PoolManager(RequestMethods):
"""
# TODO: Make sure there are no memory leaks here.
def __init__(self, num_pools=10, **connection_pool_kw):
self.connection_pool_kw = connection_pool_kw
self.pools = RecentlyUsedContainer(num_pools)
self.pools = RecentlyUsedContainer(num_pools,
dispose_func=lambda p: p.close())
def connection_from_host(self, host, port=80, scheme='http'):
def clear(self):
"""
Empty our store of pools and direct them all to close.
This will not affect in-flight connections, but they will not be
re-used after completion.
"""
self.pools.clear()
def connection_from_host(self, host, port=None, scheme='http'):
"""
Get a :class:`ConnectionPool` based on the host, port, and scheme.
Note that an appropriate ``port`` value is required here to normalize
connection pools in our container most effectively.
If ``port`` isn't given, it will be derived from the ``scheme`` using
``urllib3.connectionpool.port_by_scheme``.
"""
port = port or port_by_scheme.get(scheme, 80)
pool_key = (scheme, host, port)
# If the scheme, host, or port doesn't match existing open connections,
@@ -86,26 +96,36 @@ class PoolManager(RequestMethods):
Additional parameters are taken from the :class:`.PoolManager`
constructor.
"""
scheme, host, port = get_host(url)
u = parse_url(url)
return self.connection_from_host(u.host, port=u.port, scheme=u.scheme)
port = port or port_by_scheme.get(scheme, 80)
return self.connection_from_host(host, port=port, scheme=scheme)
def urlopen(self, method, url, **kw):
def urlopen(self, method, url, redirect=True, **kw):
"""
Same as :meth:`urllib3.connectionpool.HTTPConnectionPool.urlopen`.
Same as :meth:`urllib3.connectionpool.HTTPConnectionPool.urlopen`
with custom cross-host redirect logic and only sends the request-uri
portion of the ``url``.
``url`` must be absolute, such that an appropriate
The given ``url`` parameter must be absolute, such that an appropriate
:class:`urllib3.connectionpool.ConnectionPool` can be chosen for it.
"""
conn = self.connection_from_url(url)
try:
return conn.urlopen(method, url, **kw)
u = parse_url(url)
conn = self.connection_from_host(u.host, port=u.port, scheme=u.scheme)
except HostChangedError as e:
kw['retries'] = e.retries # Persist retries countdown
return self.urlopen(method, e.url, **kw)
kw['assert_same_host'] = False
kw['redirect'] = False
response = conn.urlopen(method, u.request_uri, **kw)
redirect_location = redirect and response.get_redirect_location()
if not redirect_location:
return response
if response.status == 303:
method = 'GET'
log.info("Redirecting %s -> %s" % (url, redirect_location))
kw['retries'] = kw.get('retries', 3) - 1 # Persist retries countdown
return self.urlopen(method, redirect_location, **kw)
class ProxyManager(RequestMethods):
+4 -4
View File
@@ -10,7 +10,7 @@ import zlib
from io import BytesIO
from .exceptions import HTTPError
from .exceptions import DecodeError
from .packages.six import string_types as basestring
@@ -148,9 +148,9 @@ class HTTPResponse(object):
try:
if decode_content and decoder:
data = decoder(data)
except IOError:
raise HTTPError("Received response with content-encoding: %s, but "
"failed to decode it." % content_encoding)
except (IOError, zlib.error):
raise DecodeError("Received response with content-encoding: %s, but "
"failed to decode it." % content_encoding)
if cache_content:
self._body = data
+153 -83
View File
@@ -6,6 +6,8 @@
from base64 import b64encode
from collections import namedtuple
from socket import error as SocketError
try:
from select import poll, POLLIN
@@ -20,6 +22,152 @@ from .packages import six
from .exceptions import LocationParseError
class Url(namedtuple('Url', ['scheme', 'auth', 'host', 'port', 'path', 'query', 'fragment'])):
"""
Datastructure for representing an HTTP URL. Used as a return value for
:func:`parse_url`.
"""
slots = ()
def __new__(cls, scheme=None, auth=None, host=None, port=None, path=None, query=None, fragment=None):
return super(Url, cls).__new__(cls, scheme, auth, host, port, path, query, fragment)
@property
def hostname(self):
"""For backwards-compatibility with urlparse. We're nice like that."""
return self.host
@property
def request_uri(self):
"""Absolute path including the query string."""
uri = self.path or '/'
if self.query is not None:
uri += '?' + self.query
return uri
def split_first(s, delims):
"""
Given a string and an iterable of delimiters, split on the first found
delimiter. Return two split parts and the matched delimiter.
If not found, then the first part is the full input string.
Example: ::
>>> split_first('foo/bar?baz', '?/=')
('foo', 'bar?baz', '/')
>>> split_first('foo/bar?baz', '123')
('foo/bar?baz', '', None)
Scales linearly with number of delims. Not ideal for large number of delims.
"""
min_idx = None
min_delim = None
for d in delims:
idx = s.find(d)
if idx < 0:
continue
if min_idx is None or idx < min_idx:
min_idx = idx
min_delim = d
if min_idx is None or min_idx < 0:
return s, '', None
return s[:min_idx], s[min_idx+1:], min_delim
def parse_url(url):
"""
Given a url, return a parsed :class:`.Url` namedtuple. Best-effort is
performed to parse incomplete urls. Fields not provided will be None.
Partly backwards-compatible with :mod:`urlparse`.
Example: ::
>>> parse_url('http://google.com/mail/')
Url(scheme='http', host='google.com', port=None, path='/', ...)
>>> prase_url('google.com:80')
Url(scheme=None, host='google.com', port=80, path=None, ...)
>>> prase_url('/foo?bar')
Url(scheme=None, host=None, port=None, path='/foo', query='bar', ...)
"""
# While this code has overlap with stdlib's urlparse, it is much
# simplified for our needs and less annoying.
# Additionally, this imeplementations does silly things to be optimal
# on CPython.
scheme = None
auth = None
host = None
port = None
path = None
fragment = None
query = None
# Scheme
if '://' in url:
scheme, url = url.split('://', 1)
# Find the earliest Authority Terminator
# (http://tools.ietf.org/html/rfc3986#section-3.2)
url, path_, delim = split_first(url, ['/', '?', '#'])
if delim:
# Reassemble the path
path = delim + path_
# Auth
if '@' in url:
auth, url = url.split('@', 1)
# IPv6
if url and url[0] == '[':
host, url = url[1:].split(']', 1)
# Port
if ':' in url:
_host, port = url.split(':', 1)
if not host:
host = _host
if not port.isdigit():
raise LocationParseError("Failed to parse: %s" % url)
port = int(port)
elif not host and url:
host = url
if not path:
return Url(scheme, auth, host, port, path, query, fragment)
# Fragment
if '#' in path:
path, fragment = path.split('#', 1)
# Query
if '?' in path:
path, query = path.split('?', 1)
return Url(scheme, auth, host, port, path, query, fragment)
def get_host(url):
"""
Deprecated. Use :func:`.parse_url` instead.
"""
p = parse_url(url)
return p.scheme or 'http', p.hostname, p.port
def make_headers(keep_alive=None, accept_encoding=None, user_agent=None,
basic_auth=None):
"""
@@ -72,93 +220,12 @@ def make_headers(keep_alive=None, accept_encoding=None, user_agent=None,
return headers
def split_first(s, delims):
"""
Given a string and an iterable of delimiters, split on the first found
delimiter. Return two split parts.
If not found, then the first part is the full input string.
Scales linearly with number of delims. Not ideal for large number of delims.
"""
min_idx = None
for d in delims:
idx = s.find(d)
if idx < 0:
continue
if not min_idx:
min_idx = idx
else:
min_idx = min(idx, min_idx)
if min_idx < 0:
return s, ''
return s[:min_idx], s[min_idx+1:]
def get_host(url):
"""
Given a url, return its scheme, host and port (None if it's not there).
For example: ::
>>> get_host('http://google.com/mail/')
('http', 'google.com', None)
>>> get_host('google.com:80')
('http', 'google.com', 80)
"""
# While this code has overlap with stdlib's urlparse, it is much
# simplified for our needs and less annoying.
# Additionally, this imeplementations does silly things to be optimal
# on CPython.
scheme = 'http'
host = None
port = None
# Scheme
if '://' in url:
scheme, url = url.split('://', 1)
# Find the earliest Authority Terminator
# (http://tools.ietf.org/html/rfc3986#section-3.2)
url, _path = split_first(url, ['/', '?', '#'])
# Auth
if '@' in url:
_auth, url = url.split('@', 1)
# IPv6
if url and url[0] == '[':
host, url = url[1:].split(']', 1)
# Port
if ':' in url:
_host, port = url.split(':', 1)
if not host:
host = _host
if not port.isdigit():
raise LocationParseError("Failed to parse: %s" % url)
port = int(port)
elif not host:
host = url
return scheme, host, port
def is_connection_dropped(conn):
"""
Returns True if the connection is dropped and should be closed.
:param conn:
``HTTPConnection`` object.
:class:`httplib.HTTPConnection` object.
Note: For platforms like AppEngine, this will always return ``False`` to
let the platform handle connection recycling transparently for us.
@@ -171,7 +238,10 @@ def is_connection_dropped(conn):
if not select: # Platform-specific: AppEngine
return False
return select([sock], [], [], 0.0)[0]
try:
return select([sock], [], [], 0.0)[0]
except SocketError:
return True
# This version is better on platforms that support it.
p = poll()