urllib3 updates

This commit is contained in:
Kenneth Reitz
2011-09-25 16:05:14 -04:00
parent df66fa7975
commit 0db4dab03e
7 changed files with 492 additions and 173 deletions
+14 -10
View File
@@ -2,21 +2,25 @@
urllib3 - Thread-safe connection pooling and re-using.
"""
from connectionpool import (
connection_from_url,
get_host,
__author__ = "Andrey Petrov (andrey.petrov@shazow.net)"
__license__ = "MIT"
__version__ = "$Rev$"
from .connectionpool import (
HTTPConnectionPool,
HTTPSConnectionPool,
connection_from_url,
get_host,
make_headers)
# Possible exceptions
from connectionpool import (
from .exceptions import (
HTTPError,
MaxRetryError,
SSLError,
TimeoutError)
from filepost import encode_multipart_formdata
__author__ = "Andrey Petrov (andrey.petrov@shazow.net)"
__license__ = "MIT"
__version__ = "$Rev$"
from .poolmanager import PoolManager
from .response import HTTPResponse
from .filepost import encode_multipart_formdata
+115
View File
@@ -0,0 +1,115 @@
from collections import MutableMapping, deque
__all__ = ['RecentlyUsedContainer']
class AccessEntry(object):
__slots__ = ('key', 'is_valid')
def __init__(self, key, is_valid=True):
self.key = key
self.is_valid = is_valid
class RecentlyUsedContainer(MutableMapping):
"""
Provides a dict-like that maintains up to ``maxsize`` keys while throwing
away the least-recently-used keys beyond ``maxsize``.
"""
# TODO: Make this threadsafe. _prune_invalidated_entries should be the
# only real pain-point for this.
# If len(self.access_log) exceeds self._maxsize * CLEANUP_FACTOR, then we
# will attempt to cleanup the invalidated entries in the access_log
# datastructure during the next 'get' operation.
CLEANUP_FACTOR = 10
def __init__(self, maxsize=10):
self._maxsize = maxsize
self._container = {}
# We use a deque to to store our keys ordered by the last access.
self.access_log = deque()
# We look up the access log entry by the key to invalidate it so we can
# insert a new authorative entry at the head without having to dig and
# find the old entry for removal immediately.
self.access_lookup = {}
# Trigger a heap cleanup when we get past this size
self.access_log_limit = maxsize * self.CLEANUP_FACTOR
def _push_entry(self, key):
"Push entry onto our access log, invalidate the old entry if exists."
# Invalidate old entry if it exists
old_entry = self.access_lookup.get(key)
if old_entry:
old_entry.is_valid = False
new_entry = AccessEntry(key)
self.access_lookup[key] = new_entry
self.access_log.appendleft(new_entry)
def _prune_entries(self, num):
"Pop entries from our access log until we popped ``num`` valid ones."
while num > 0:
p = self.access_log.pop()
if not p.is_valid:
continue # Invalidated entry, skip
del self._container[p.key]
del self.access_lookup[p.key]
num -= 1
def _prune_invalidated_entries(self):
"Rebuild our access_log without the invalidated entries."
self.access_log = deque(e for e in self.access_log if e.is_valid)
def _get_ordered_access_keys(self):
# Used for testing
return [e.key for e in self.access_log if e.is_valid]
def __getitem__(self, key):
item = self._container.get(key)
if not item:
return
# Insert new entry with new high priority, also implicitly invalidates
# the old entry.
self._push_entry(key)
if len(self.access_log) > self.access_log_limit:
# Heap is getting too big, try to clean up any tailing invalidated
# entries.
self._prune_invalidated_entries()
return item
def __setitem__(self, key, item):
# Add item to our container and access log
self._container[key] = item
self._push_entry(key)
# Discard invalid and excess entries
self._prune_entries(len(self._container) - self._maxsize)
def __delitem__(self, key):
self._invalidate_entry(key)
del self._container[key]
del self._access_lookup[key]
def __len__(self):
return len(self.access_log)
def __iter__(self):
return self._container.__iter__()
def __contains__(self, key):
return self._container.__contains__(key)
+88 -156
View File
@@ -1,5 +1,3 @@
import gzip
import zlib
import logging
import socket
@@ -14,122 +12,25 @@ from socket import error as SocketError, timeout as SocketTimeout
try:
import ssl
BaseSSLError = ssl.SSLError
except ImportError, e:
except ImportError:
ssl = None
BaseSSLError = None
try:
from cStringIO import StringIO
except ImportError, e:
from StringIO import StringIO
from filepost import encode_multipart_formdata
from .filepost import encode_multipart_formdata
from .response import HTTPResponse
from .exceptions import (
SSLError,
MaxRetryError,
TimeoutError,
HostChangedError,
EmptyPoolError)
log = logging.getLogger(__name__)
## Exceptions
class HTTPError(Exception):
"Base exception used by this module."
pass
class SSLError(Exception):
"Raised when SSL certificate fails in an HTTPS connection."
pass
class MaxRetryError(HTTPError):
"Raised when the maximum number of retries is exceeded."
pass
class TimeoutError(HTTPError):
"Raised when a socket timeout occurs."
pass
class HostChangedError(HTTPError):
"Raised when an existing pool gets a request for a foreign host."
pass
## Response objects
class HTTPResponse(object):
"""
HTTP Response container.
Similar to httplib's HTTPResponse but the data is pre-loaded.
"""
def __init__(self, data='', headers=None, status=0, version=0, reason=None,
strict=0):
self.data = data
self.headers = headers or {}
self.status = status
self.version = version
self.reason = reason
self.strict = strict
@staticmethod
def from_httplib(r, block=True):
"""
Given an httplib.HTTPResponse instance, return a corresponding
urllib3.HTTPResponse object.
NOTE: This method will perform r.read() which will have side effects
on the original http.HTTPResponse object.
"""
if block:
tmp_data = r.read()
try:
if r.getheader('content-encoding') == 'gzip':
log.debug("Received response with content-encoding: gzip, "
"decompressing with gzip.")
gzipper = gzip.GzipFile(fileobj=StringIO(tmp_data))
data = gzipper.read()
elif r.getheader('content-encoding') == 'deflate':
log.debug("Received response with content-encoding: deflate, "
"decompressing with zlib.")
try:
data = zlib.decompress(tmp_data)
except zlib.error, e:
data = zlib.decompress(tmp_data, -zlib.MAX_WBITS)
else:
data = tmp_data
except IOError:
raise HTTPError("Received response with content-encoding: %s, "
"but failed to decompress it." %
(r.getheader('content-encoding')))
else:
data = None
resp = HTTPResponse(data=data,
headers=dict(r.getheaders()),
status=r.status,
version=r.version,
reason=r.reason,
strict=r.strict)
resp._raw = r
return resp
# Backwards-compatibility methods for httplib.HTTPResponse
def getheaders(self):
return self.headers
def getheader(self, name, default=None):
return self.headers.get(name, default)
## Connection objects
## Connection objects (extension of httplib)
class VerifiedHTTPSConnection(HTTPSConnection):
"""
@@ -137,8 +38,13 @@ class VerifiedHTTPSConnection(HTTPSConnection):
SSL certification.
"""
def set_cert(self, key_file=None, cert_file=None, cert_reqs='CERT_NONE',
ca_certs=None):
def __init__(self):
HTTPSConnection.__init__()
self.cert_reqs = None
self.ca_certs = None
def set_cert(self, key_file=None, cert_file=None,
cert_reqs='CERT_NONE', ca_certs=None):
ssl_req_scheme = {
'CERT_NONE': ssl.CERT_NONE,
'CERT_OPTIONAL': ssl.CERT_OPTIONAL,
@@ -163,7 +69,11 @@ class VerifiedHTTPSConnection(HTTPSConnection):
## Pool objects
class HTTPConnectionPool(object):
class ConnectionPool(object):
pass
class HTTPConnectionPool(ConnectionPool):
"""
Thread-safe connection pool for one host.
@@ -215,8 +125,10 @@ class HTTPConnectionPool(object):
self.headers = headers or {}
# Fill the queue up so that doing get() on it will block properly
[self.pool.put(None) for i in xrange(maxsize)]
for _ in xrange(maxsize):
self.pool.put(None)
# These are mostly for testing and debugging purposes.
self.num_connections = 0
self.num_requests = 0
@@ -241,11 +153,13 @@ class HTTPConnectionPool(object):
# If this is a persistent connection, check if it got disconnected
if conn and conn.sock and select([conn.sock], [], [], 0.0)[0]:
# Either data is buffered (bad), or the connection is dropped.
log.warning("Connection pool detected dropped "
"connection, resetting: %s" % self.host)
log.info("Resetting dropped connection: %s" % self.host)
conn.close()
except Empty, e:
except Empty:
if self.block:
raise EmptyPoolError("Pool reached maximum size and no more "
"connections are allowed.")
pass # Oh well, we'll create a new connection then
return conn or self._new_conn()
@@ -259,17 +173,37 @@ class HTTPConnectionPool(object):
"""
try:
self.pool.put(conn, block=False)
except Full, e:
except Full:
# This should never happen if self.block == True
log.warning("HttpConnectionPool is full, discarding connection: %s"
% self.host)
def _make_request(self, conn, method, url, **httplib_request_kw):
"""
Perform a request on a given httplib connection object taken from our
pool.
"""
self.num_requests += 1
conn.request(method, url, **httplib_request_kw)
conn.sock.settimeout(self.timeout)
httplib_response = conn.getresponse()
log.debug("\"%s %s %s\" %s %s" %
(method, url,
conn._http_vsn_str, # pylint: disable-msg=W0212
httplib_response.status, httplib_response.length))
return httplib_response
def is_same_host(self, url):
return (url.startswith('/') or
get_host(url) == (self.scheme, self.host, self.port))
def urlopen(self, method, url, body=None, headers=None, retries=3,
redirect=True, assert_same_host=True, block=True):
redirect=True, assert_same_host=True, pool_timeout=None,
**response_kw):
"""
Get a connection from the pool and perform an HTTP request.
@@ -298,8 +232,16 @@ class HTTPConnectionPool(object):
If True, will make sure that the host of the pool requests is
consistent else will raise HostChangedError. When False, you can
use the pool on an HTTP proxy and request foreign hosts.
pool_timeout
If set and the pool is set to block=True, then this method will
block for ``pool_timeout`` seconds and raise EmptyPoolError if no
connection is available within the time period.
Additional parameters are passed to
``HTTPResponse.from_httplib(r, **response_kw)``
"""
if headers == None:
if headers is None:
headers = self.headers
if retries < 0:
@@ -316,24 +258,20 @@ class HTTPConnectionPool(object):
try:
# Request a connection from the queue
conn = self._get_conn()
conn = self._get_conn(timeout=pool_timeout)
# Make the request
self.num_requests += 1
conn.request(method, url, body=body, headers=headers)
conn.sock.settimeout(self.timeout)
httplib_response = conn.getresponse()
log.debug("\"%s %s %s\" %s %s" %
(method, url, conn._http_vsn_str,
httplib_response.status, httplib_response.length))
# Make the request on the httplib connection object
httplib_response = self._make_request(conn, method, url,
body=body, headers=headers)
# from_httplib will perform httplib_response.read() which will have
# the side effect of letting us use this connection for another
# request.
response = HTTPResponse.from_httplib(httplib_response, block=block)
# Import httplib's response into our own wrapper object
response = HTTPResponse.from_httplib(httplib_response,
pool=self,
connection=conn,
**response_kw)
# Put the connection back to be reused
self._put_conn(conn)
# The connection will be put back into the pool when
# response.release_conn() is called (implicitly by response.read())
except (SocketTimeout, Empty), e:
# Timed out either by socket or queue
@@ -363,7 +301,7 @@ class HTTPConnectionPool(object):
return response
def get_url(self, url, fields=None, headers=None, retries=3,
redirect=True):
redirect=True, **response_kw):
"""
Wrapper for performing GET with urlopen (see urlopen for more details).
@@ -373,10 +311,11 @@ class HTTPConnectionPool(object):
if fields:
url += '?' + urlencode(fields)
return self.urlopen('GET', url, headers=headers, retries=retries,
redirect=redirect)
redirect=redirect, **response_kw)
def post_url(self, url, fields=None, headers=None, retries=3,
redirect=True, encode_multipart=True):
redirect=True, encode_multipart=True, multipart_boundary=None,
**response_kw):
"""
Wrapper for performing POST with urlopen (see urlopen
for more details).
@@ -404,7 +343,8 @@ class HTTPConnectionPool(object):
which is used to compose the body of the request.
"""
if encode_multipart:
body, content_type = encode_multipart_formdata(fields or {})
body, content_type = encode_multipart_formdata(fields or {},
boundary=multipart_boundary)
else:
body, content_type = (
urlencode(fields or {}),
@@ -414,7 +354,7 @@ class HTTPConnectionPool(object):
headers.update({'Content-Type': content_type})
return self.urlopen('POST', url, body, headers=headers,
retries=retries, redirect=redirect)
retries=retries, redirect=redirect, **response_kw)
class HTTPSConnectionPool(HTTPConnectionPool):
@@ -424,28 +364,20 @@ class HTTPSConnectionPool(HTTPConnectionPool):
scheme = 'https'
def __init__(self, host, port=None, strict=False, timeout=None, maxsize=1,
block=False, headers=None, key_file=None,
cert_file=None, cert_reqs='CERT_NONE', ca_certs=None):
self.host = host
self.port = port
self.strict = strict
self.timeout = timeout
self.pool = Queue(maxsize)
self.block = block
self.headers = headers or {}
def __init__(self, host, port=None,
strict=False, timeout=None, maxsize=1,
block=False, headers=None,
key_file=None, cert_file=None,
cert_reqs='CERT_NONE', ca_certs=None):
super(HTTPSConnectionPool, self).__init__(host, port,
strict, timeout, maxsize,
block, headers)
self.key_file = key_file
self.cert_file = cert_file
self.cert_reqs = cert_reqs
self.ca_certs = ca_certs
# Fill the queue up so that doing get() on it will block properly
[self.pool.put(None) for i in xrange(maxsize)]
self.num_connections = 0
self.num_requests = 0
def _new_conn(self):
"""
Return a fresh HTTPSConnection.
@@ -527,7 +459,7 @@ def get_host(url):
if '//' in url:
scheme, url = url.split('://', 1)
if '/' in url:
url, path = url.split('/', 1)
url, _path = url.split('/', 1)
if ':' in url:
url, port = url.split(':', 1)
port = int(port)
+29
View File
@@ -0,0 +1,29 @@
## Exceptions
class HTTPError(Exception):
"Base exception used by this module."
pass
class SSLError(Exception):
"Raised when SSL certificate fails in an HTTPS connection."
pass
class MaxRetryError(HTTPError):
"Raised when the maximum number of retries is exceeded."
pass
class TimeoutError(HTTPError):
"Raised when a socket timeout occurs."
pass
class HostChangedError(HTTPError):
"Raised when an existing pool gets a request for a foreign host."
pass
class EmptyPoolError(HTTPError):
"Raised when a pool runs out of connections and no more are allowed."
pass
+9 -7
View File
@@ -1,10 +1,11 @@
import codecs
import mimetools
import mimetypes
try:
from cStringIO import StringIO
except:
from StringIO import StringIO
except ImportError:
from StringIO import StringIO # pylint: disable-msg=W0404
writer = codecs.lookup('utf-8')[3]
@@ -14,12 +15,13 @@ def get_content_type(filename):
return mimetypes.guess_type(filename)[0] or 'application/octet-stream'
def encode_multipart_formdata(fields):
def encode_multipart_formdata(fields, boundary=None):
body = StringIO()
BOUNDARY = mimetools.choose_boundary()
if boundary is None:
boundary = mimetools.choose_boundary()
for fieldname, value in fields.iteritems():
body.write('--%s\r\n' % (BOUNDARY))
body.write('--%s\r\n' % (boundary))
if isinstance(value, tuple):
filename, data = value
@@ -43,8 +45,8 @@ def encode_multipart_formdata(fields):
body.write('\r\n')
body.write('--%s--\r\n' % (BOUNDARY))
body.write('--%s--\r\n' % (boundary))
content_type = 'multipart/form-data; boundary=%s' % BOUNDARY
content_type = 'multipart/form-data; boundary=%s' % boundary
return body.getvalue(), content_type
+64
View File
@@ -0,0 +1,64 @@
from ._collections import RecentlyUsedContainer
from .connectionpool import HTTPConnectionPool, HTTPSConnectionPool, get_host
pool_classes_by_scheme = {
'http': HTTPConnectionPool,
'https': HTTPSConnectionPool,
}
port_by_scheme = {
'http': 80,
'https': 433,
}
class PoolManager(object):
"""
Allows for arbitrary requests while transparently keeping track of
necessary connection pools for you.
num_pools
Number of connection pools to cache before discarding the least recently
used pool.
Additional parameters are used to create fresh ConnectionPool instances.
"""
# TODO: Make sure there are no memory leaks here.
def __init__(self, num_pools=10, **connection_pool_kw):
self.connection_pool_kw = connection_pool_kw
self.pools = RecentlyUsedContainer(num_pools)
self.recently_used_pools = []
def connection_from_url(self, url):
"""
Similar to connectionpool.connection_from_url but doesn't pass any
additional keywords to the ConnectionPool constructor. Additional
keywords are taken from the PoolManager constructor.
"""
scheme, host, port = get_host(url)
# If the scheme, host, or port doesn't match existing open connections,
# open a new ConnectionPool.
pool_key = (scheme, host, port or port_by_scheme.get(scheme, 80))
pool = self.pools.get(pool_key)
if pool:
return pool
# Make a fresh ConnectionPool of the desired type
pool_cls = pool_classes_by_scheme[scheme]
pool = pool_cls(host, port, **self.connection_pool_kw)
self.pools[pool_key] = pool
return pool
def urlopen(self, method, url, **kw):
"Same as HTTP(S)ConnectionPool.urlopen"
conn = self.connection_from_url(url)
return conn.urlopen(method, url, **kw)
+173
View File
@@ -0,0 +1,173 @@
import gzip
import logging
import zlib
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO # pylint: disable-msg=W0404
from .exceptions import HTTPError
log = logging.getLogger(__name__)
def decode_gzip(data):
gzipper = gzip.GzipFile(fileobj=StringIO(data))
return gzipper.read()
def decode_deflate(data):
try:
return zlib.decompress(data)
except zlib.error:
return zlib.decompress(data, -zlib.MAX_WBITS)
class HTTPResponse(object):
"""
HTTP Response container.
Backwards-compatible to httplib's HTTPResponse but the response ``body`` is
loaded and decoded on-demand when the ``data`` property is accessed.
Extra parameters for behaviour not present in httplib.HTTPResponse:
preload_content
If True, the response's body will be preloaded during construction.
decode_content
If True, attempts to decode specific content-encoding's based on headers
(like 'gzip' and 'deflate') will be skipped and raw data will be used
instead.
original_response
When this HTTPResponse wrapper is generated from an httplib.HTTPResponse
object, it's convenient to include the original for debug purposes. It's
otherwise unused.
"""
CONTENT_DECODERS = {
'gzip': decode_gzip,
'deflate': decode_deflate,
}
def __init__(self, body='', headers=None, status=0, version=0, reason=None,
strict=0, preload_content=False, decode_content=True,
original_response=None, pool=None, connection=None):
self.headers = headers or {}
self.status = status
self.version = version
self.reason = reason
self.strict = strict
self._decode_content = decode_content
self._body = None
self._fp = None
self._original_response = original_response
self._pool = pool
self._connection = connection
if hasattr(body, 'read'):
self._fp = body
if preload_content:
self._body = self.read(decode_content=decode_content)
def release_conn(self):
if not self._pool or not self._connection:
return
self._pool._put_conn(self._connection)
@property
def data(self):
# For backwords-compat with earlier urllib3 0.4 and earlier.
if self._body:
return self._body
if self._fp:
return self.read(decode_content=self._decode_content,
cache_content=True)
def read(self, amt=None, decode_content=True, cache_content=False):
"""
Similar to ``httplib.HTTPResponse.read(amt=None)``.
amt
How much of the content to read. If specified, decoding and caching
is skipped because we can't decode partial content nor does it make
sense to cache partial content as the full response.
decode_content
If True, will attempt to decode the body based on the
'content-encoding' header. (Overridden if ``amt`` is set.)
cache_content
If True, will save the returned data such that the same result is
returned despite of the state of the underlying file object. This
is useful if you want the ``.data`` property to continue working
after having ``.read()`` the file object. (Overridden if ``amt`` is
set.)
"""
content_encoding = self.headers.get('content-encoding')
decoder = self.CONTENT_DECODERS.get(content_encoding)
data = self._fp and self._fp.read(amt)
try:
if amt:
return data
if not decode_content or not decoder:
if cache_content:
self._body = data
return data
try:
data = decoder(data)
except IOError:
raise HTTPError("Received response with content-encoding: %s, but "
"failed to decode it." % content_encoding)
if cache_content:
self._body = data
return data
finally:
if self._original_response and self._original_response.isclosed():
self.release_conn()
@staticmethod
def from_httplib(r, **response_kw):
"""
Given an httplib.HTTPResponse instance ``r``, return a corresponding
urllib3.HTTPResponse object.
Remaining parameters are passed to the HTTPResponse constructor, along
with ``original_response=r``.
"""
return HTTPResponse(body=r,
headers=dict(r.getheaders()),
status=r.status,
version=r.version,
reason=r.reason,
strict=r.strict,
original_response=r,
**response_kw)
# Backwards-compatibility methods for httplib.HTTPResponse
def getheaders(self):
return self.headers
def getheader(self, name, default=None):
return self.headers.get(name, default)