Signed-off-by: Kenneth Reitz <me@kennethreitz.org>
This commit is contained in:
2018-03-16 18:11:21 -04:00
parent ece1f25617
commit 84c1eb2de2
3 changed files with 141 additions and 34 deletions
+105 -5
View File
@@ -11,7 +11,9 @@ import os.path
import socket
import requests_core
from requests_core.http_manager._backends import TrioBackend
from requests_core.http_manager.poolmanager import PoolManager, proxy_from_url
from requests_core.http_manager._async.poolmanager import PoolManager as AsyncPoolManager
from requests_core.http_manager.response import HTTPResponse
from requests_core.http_manager.util import Timeout as TimeoutSauce
from requests_core.http_manager.util.retry import Retry
@@ -418,7 +420,7 @@ class HTTPAdapter(BaseAdapter):
)
return headers
def send(
async def send(
self,
request,
stream=False,
@@ -563,13 +565,109 @@ class HTTPAdapter(BaseAdapter):
else:
raise
return self.build_response(request, resp)
return await self.build_response(request, resp)
class AsyncHTTPAdapter(HTTPAdapter):
"""docstring for AsyncHTTPAdapter"""
def __init__(self, *args, **kwargs):
def __init__(self, backend=None, *args, **kwargs):
self.backend = backend or TrioBackend()
super(AsyncHTTPAdapter, self).__init__(*args, **kwargs)
async def build_response(self, req, resp):
"""Builds a :class:`Response <requests.Response>` object from a urllib3
response. This should not be called from user code, and is only exposed
for use when subclassing the
:class:`HTTPAdapter <requests.adapters.HTTPAdapter>`
:param req: The :class:`PreparedRequest <PreparedRequest>` used to generate the response.
:param resp: The urllib3 response object.
:rtype: requests.Response
"""
response = Response()
# Fallback to None if there's no status_code, for whatever reason.
response.status_code = getattr(resp, 'status', None)
# Make headers case-insensitive.
response.headers = HTTPHeaderDict(getattr(resp, 'headers', {}))
# Set encoding.
response.encoding = get_encoding_from_headers(response.headers)
response.raw = resp
response.reason = response.raw.reason
if isinstance(req.url, bytes):
response.url = req.url.decode('utf-8')
else:
response.url = req.url
# Add new cookies from the server.
extract_cookies_to_jar(response.cookies, req, resp)
# Give the Response some context.
response.request = req
response.connection = self
return response
def init_poolmanager(
self, connections, maxsize, block=DEFAULT_POOLBLOCK, **pool_kwargs
):
"""Initializes a urllib3 PoolManager.
This method should not be called from user code, and is only
exposed for use when subclassing the
:class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
:param connections: The number of urllib3 connection pools to cache.
:param maxsize: The maximum number of connections to save in the pool.
:param block: Block when no free connections are available.
:param pool_kwargs: Extra keyword arguments used to initialize the Pool Manager.
"""
# save these values for pickling
self._pool_connections = connections
self._pool_maxsize = maxsize
self._pool_block = block
self.poolmanager = AsyncPoolManager(
num_pools=connections,
maxsize=maxsize,
block=block,
strict=True,
backend=self.backend,
**pool_kwargs,
)
def get_connection(self, url, proxies=None, verify=None, cert=None):
"""Returns a urllib3 connection for the given URL. This should not be
called from user code, and is only exposed for use when subclassing the
:class:`HTTPAdapter <requests.adapters.HTTPAdapter>`.
:param url: The URL to connect to.
:param proxies: (optional) A Requests-style dictionary of proxies used on this request.
:rtype: urllib3.ConnectionPool
"""
pool_kwargs = _pool_kwargs(verify, cert)
proxy = select_proxy(url, proxies)
if proxy:
proxy = prepend_scheme_if_needed(proxy, 'http')
proxy_manager = self.proxy_manager_for(proxy)
conn = proxy_manager.connection_from_url(
url, pool_kwargs=pool_kwargs
)
else:
# Only scheme should be lower case
parsed = urlparse(url)
url = parsed.geturl()
conn = self.poolmanager.connection_from_url(
url, pool_kwargs=pool_kwargs
)
return conn
def close(self):
"""Disposes of any internal state.
Currently, this closes the PoolManager and any active ProxyManager,
which closes any pooled connections.
"""
self.poolmanager.clear()
for proxy in self.proxy_manager.values():
proxy.clear()
pass
async def send(
self,
request,
@@ -595,6 +693,7 @@ class AsyncHTTPAdapter(HTTPAdapter):
:rtype: requests.Response
"""
conn = self.get_connection(request.url, proxies, verify, cert)
url = self.request_url(request, proxies)
self.add_headers(request)
chunked = not (
@@ -633,6 +732,7 @@ class AsyncHTTPAdapter(HTTPAdapter):
enforce_content_length=True,
pool=conn
)
# Send the request.
else:
if hasattr(conn, 'proxy_pool'):
@@ -658,7 +758,7 @@ class AsyncHTTPAdapter(HTTPAdapter):
# Receive the response from the server
try:
# For Python 2.7, use buffering of HTTP responses
r = low_conn.getresponse(buffering=True)
r = alow_conn.getresponse(buffering=True)
except TypeError:
# For Python 3.3+ versions, this is the default
r = low_conn.getresponse()
@@ -715,4 +815,4 @@ class AsyncHTTPAdapter(HTTPAdapter):
else:
raise
return self.build_response(request, resp)
return await self.build_response(request, resp)
+33 -26
View File
@@ -747,11 +747,11 @@ class Response(object):
return self._next
@property
def apparent_encoding(self):
async def apparent_encoding(self):
"""The apparent encoding, provided by the chardet library."""
return chardet.detect(self.content)['encoding']
return chardet.detect(await self.content)['encoding']
def iter_content(self, chunk_size=1, decode_unicode=False):
async def iter_content(self, decode_unicode=False):
"""Iterates over the response data. When stream=True is set on the
request, this avoids reading the content at once into memory for
large responses. The chunk size is the number of bytes it should
@@ -768,12 +768,15 @@ class Response(object):
enumeration before invoking iter_content.
"""
def generate():
DEFAULT_CHUNK_SIZE = 1
async def generate():
# Special case for urllib3.
if hasattr(self.raw, 'stream'):
try:
for chunk in self.raw.stream(
chunk_size, # decode_content=True
async for chunk in self.raw.stream(
# chunk_size, decode_content=True
decode_content=True
):
yield chunk
@@ -793,7 +796,7 @@ class Response(object):
else:
# Standard file-like object.
while True:
chunk = self.raw.read(chunk_size)
chunk = await self.raw.read(chunk_size)
if not chunk:
break
@@ -804,15 +807,15 @@ class Response(object):
if self._content_consumed and isinstance(self._content, bool):
raise StreamConsumedError()
elif chunk_size is not None and not isinstance(chunk_size, int):
raise TypeError(
"chunk_size must be an int, it is instead a %s." %
type(chunk_size)
)
# elif chunk_size is not None and not isinstance(chunk_size, int):
# raise TypeError(
# f"chunk_size must be an int, it is instead a {type(chunk_size)}."
# )
# simulate reading small chunks of the content
reused_chunks = iter_slices(self._content, chunk_size)
stream_chunks = generate()
reused_chunks = iter_slices(self._content, DEFAULT_CHUNK_SIZE)
stream_chunks = await generate().__anext__()
chunks = reused_chunks if self._content_consumed else stream_chunks
if decode_unicode:
if self.encoding is None:
@@ -903,7 +906,7 @@ class Response(object):
yield pending
@property
def content(self):
async def content(self):
"""Content of the response, in bytes."""
if self._content is False:
# Read the contents.
@@ -915,17 +918,20 @@ class Response(object):
if self.status_code == 0 or self.raw is None:
self._content = None
else:
# self._content = await self.iter_content(CONTENT_CHUNK_SIZE)
# print(bytes().join(
# [await self.iter_content(CONTENT_CHUNK_SIZE)]
# ))
self._content = bytes().join(
self.iter_content(CONTENT_CHUNK_SIZE)
) or bytes(
)
[await self.iter_content()]
) or bytes()
self._content_consumed = True
# don't need to release the connection; that's been handled by urllib3
# since we exhausted the data.
return self._content
@property
def text(self):
async def text(self):
"""Content of the response, in unicode.
If Response.encoding is None, encoding will be guessed using
@@ -939,7 +945,7 @@ class Response(object):
# Try charset from content-type
content = None
encoding = self.encoding
if not self.content:
if not await self.content:
return str('')
# Fallback to auto-detected encoding.
@@ -955,25 +961,26 @@ class Response(object):
# A TypeError can be raised if encoding is None
#
# So we try blindly encoding.
content = str(self.content, errors='replace')
content = str(await self.content, errors='replace')
return content
def json(self, **kwargs):
async def json(self, **kwargs):
r"""Returns the json-encoded content of a response, if any.
:param \*\*kwargs: Optional arguments that ``json.loads`` takes.
:raises ValueError: If the response body does not contain valid json.
"""
if not self.encoding and self.content and len(self.content) > 3:
if not self.encoding and await self.content and len(await self.content) > 3:
# No encoding set. JSON RFC 4627 section 3 states we should expect
# UTF-8, -16 or -32. Detect which one to use; If the detection or
# decoding fails, fall back to `self.text` (using chardet to make
# a best guess).
encoding = guess_json_utf(self.content)
encoding = guess_json_utf(await self.content)
if encoding is not None:
try:
content = await self.content
return complexjson.loads(
self.content.decode(encoding), **kwargs
content.decode(encoding), **kwargs
)
except UnicodeDecodeError:
@@ -982,7 +989,7 @@ class Response(object):
# and the server didn't bother to tell us what codec *was*
# used.
pass
return complexjson.loads(self.text, **kwargs)
return complexjson.loads(await self.text, **kwargs)
@property
def links(self):
+3 -3
View File
@@ -752,8 +752,8 @@ class AsyncSession(Session):
def __init__(self, backend=None):
self.backend = backend or TrioBackend()
super(AsyncSession, self).__init__()
self.mount('https://', AsyncHTTPAdapter())
self.mount('http://', AsyncHTTPAdapter())
self.mount('https://', AsyncHTTPAdapter(backend=self.backend))
self.mount('http://', AsyncHTTPAdapter(backend=self.backend))
async def get(self, url, **kwargs):
r"""Sends a GET request. Returns :class:`Response` object.
@@ -960,5 +960,5 @@ class AsyncSession(Session):
except StopIteration:
pass
if not stream:
r.content
await r.content
return r