async adapters

Signed-off-by: Kenneth Reitz <me@kennethreitz.org>
This commit is contained in:
2018-03-16 11:14:10 -04:00
parent 74d71b25d3
commit ece1f25617
3 changed files with 373 additions and 2 deletions
+1 -1
View File
@@ -116,7 +116,7 @@ from .__version__ import __copyright__, __cake__
from .import utils
from .models import Request, Response, PreparedRequest
from .api import request, get, head, post, patch, put, delete, options
from .sessions import Session
from .sessions import Session, AsyncSession
from .status_codes import codes
from .exceptions import (
RequestException,
+152
View File
@@ -564,3 +564,155 @@ class HTTPAdapter(BaseAdapter):
raise
return self.build_response(request, resp)
class AsyncHTTPAdapter(HTTPAdapter):
"""docstring for AsyncHTTPAdapter"""
def __init__(self, *args, **kwargs):
super(AsyncHTTPAdapter, self).__init__(*args, **kwargs)
async def send(
self,
request,
stream=False,
timeout=None,
verify=True,
cert=None,
proxies=None,
):
"""Sends PreparedRequest object. Returns Response object.
:param request: The :class:`PreparedRequest <PreparedRequest>` being sent.
:param stream: (optional) Whether to stream the request content.
:param timeout: (optional) How long to wait for the server to send
data before giving up, as a float, or a :ref:`(connect timeout,
read timeout) <timeouts>` tuple.
:type timeout: float or tuple or urllib3 Timeout object
:param verify: (optional) Either a boolean, in which case it controls whether
we verify the server's TLS certificate, or a string, in which case it
must be a path to a CA bundle to use
:param cert: (optional) Any user-provided SSL certificate to be trusted.
:param proxies: (optional) The proxies dictionary to apply to the request.
:rtype: requests.Response
"""
conn = self.get_connection(request.url, proxies, verify, cert)
url = self.request_url(request, proxies)
self.add_headers(request)
chunked = not (
request.body is None or 'Content-Length' in request.headers
)
if isinstance(timeout, tuple):
try:
connect, read = timeout
timeout = TimeoutSauce(connect=connect, read=read)
except ValueError as e:
# this may raise a string formatting error.
err = (
"Invalid timeout {0}. Pass a (connect, read) "
"timeout tuple, or a single float to set "
"both timeouts to the same value".format(timeout)
)
raise ValueError(err)
elif isinstance(timeout, TimeoutSauce):
pass
else:
timeout = TimeoutSauce(connect=timeout, read=timeout)
try:
if not chunked:
resp = await requests_core.request(
method=request.method,
url=url,
body=request.body,
headers=request.headers,
redirect=False,
assert_same_host=False,
preload_content=False,
decode_content=False,
retries=self.max_retries,
timeout=timeout,
enforce_content_length=True,
pool=conn
)
# Send the request.
else:
if hasattr(conn, 'proxy_pool'):
conn = conn.proxy_pool
low_conn = conn._get_conn(timeout=DEFAULT_POOL_TIMEOUT)
try:
low_conn.putrequest(
request.method, url, skip_accept_encoding=True
)
for header, value in request.headers.items():
low_conn.putheader(header, value)
low_conn.endheaders()
for i in request.body:
chunk_size = len(i)
if chunk_size == 0:
continue
low_conn.send(hex(chunk_size)[2:].encode('utf-8'))
low_conn.send(b'\r\n')
low_conn.send(i)
low_conn.send(b'\r\n')
low_conn.send(b'0\r\n\r\n')
# Receive the response from the server
try:
# For Python 2.7, use buffering of HTTP responses
r = low_conn.getresponse(buffering=True)
except TypeError:
# For Python 3.3+ versions, this is the default
r = low_conn.getresponse()
resp = HTTPResponse.from_httplib(
r,
pool=conn,
connection=low_conn,
preload_content=False,
decode_content=False,
enforce_content_length=True,
request_method=request.method,
)
except:
# If we hit any problems here, clean up the connection.
# Then, reraise so that we can handle the actual exception.
low_conn.close()
raise
except (ProtocolError, socket.error) as err:
raise ConnectionError(err, request=request)
except MaxRetryError as e:
if isinstance(e.reason, ConnectTimeoutError):
# TODO: Remove this in 3.0.0: see #2811
if not isinstance(e.reason, NewConnectionError):
raise ConnectTimeout(e, request=request)
if isinstance(e.reason, ResponseError):
raise RetryError(e, request=request)
if isinstance(e.reason, _ProxyError):
raise ProxyError(e, request=request)
if isinstance(e.reason, _SSLError):
# This branch is for urllib3 v1.22 and later.
raise SSLError(e, request=request)
raise ConnectionError(e, request=request)
except ClosedPoolError as e:
raise ConnectionError(e, request=request)
except _ProxyError as e:
raise ProxyError(e)
except (_SSLError, _HTTPError) as e:
if isinstance(e, _SSLError):
# This branch is for urllib3 versions earlier than v1.22
raise SSLError(e, request=request)
elif isinstance(e, ReadTimeoutError):
raise ReadTimeout(e, request=request)
else:
raise
return self.build_response(request, resp)
+220 -1
View File
@@ -12,6 +12,8 @@ import time
from collections import Mapping, OrderedDict
from datetime import timedelta
from requests_core.http_manager._backends.trio_backend import TrioBackend
from .auth import _basic_auth_str
from .basics import cookielib, urljoin, urlparse, str
from .cookies import (
@@ -35,7 +37,7 @@ from .exceptions import (
)
from .structures import CaseInsensitiveDict
from .adapters import HTTPAdapter
from .adapters import HTTPAdapter, AsyncHTTPAdapter
from .utils import (
requote_uri,
@@ -743,3 +745,220 @@ class Session(SessionRedirectMixin):
def __setstate__(self, state):
for attr, value in state.items():
setattr(self, attr, value)
class AsyncSession(Session):
"""docstring for AsyncSession"""
def __init__(self, backend=None):
self.backend = backend or TrioBackend()
super(AsyncSession, self).__init__()
self.mount('https://', AsyncHTTPAdapter())
self.mount('http://', AsyncHTTPAdapter())
async def get(self, url, **kwargs):
r"""Sends a GET request. Returns :class:`Response` object.
:param url: URL for the new :class:`Request` object.
:param \*\*kwargs: Optional arguments that ``request`` takes.
:rtype: requests.Response
"""
kwargs.setdefault('allow_redirects', True)
return await self.request('GET', url, **kwargs)
async def options(self, url, **kwargs):
r"""Sends a OPTIONS request. Returns :class:`Response` object.
:param url: URL for the new :class:`Request` object.
:param \*\*kwargs: Optional arguments that ``request`` takes.
:rtype: requests.Response
"""
kwargs.setdefault('allow_redirects', True)
return await self.request('OPTIONS', url, **kwargs)
async def head(self, url, **kwargs):
r"""Sends a HEAD request. Returns :class:`Response` object.
:param url: URL for the new :class:`Request` object.
:param \*\*kwargs: Optional arguments that ``request`` takes.
:rtype: requests.Response
"""
kwargs.setdefault('allow_redirects', False)
return await self.request('HEAD', url, **kwargs)
async def post(self, url, data=None, json=None, **kwargs):
r"""Sends a POST request. Returns :class:`Response` object.
:param url: URL for the new :class:`Request` object.
:param data: (optional) Dictionary, bytes, or file-like object to send in the body of the :class:`Request`.
:param json: (optional) json to send in the body of the :class:`Request`.
:param \*\*kwargs: Optional arguments that ``request`` takes.
:rtype: requests.Response
"""
return await self.request('POST', url, data=data, json=json, **kwargs)
async def put(self, url, data=None, **kwargs):
r"""Sends a PUT request. Returns :class:`Response` object.
:param url: URL for the new :class:`Request` object.
:param data: (optional) Dictionary, bytes, or file-like object to send in the body of the :class:`Request`.
:param \*\*kwargs: Optional arguments that ``request`` takes.
:rtype: requests.Response
"""
return await self.request('PUT', url, data=data, **kwargs)
async def patch(self, url, data=None, **kwargs):
r"""Sends a PATCH request. Returns :class:`Response` object.
:param url: URL for the new :class:`Request` object.
:param data: (optional) Dictionary, bytes, or file-like object to send in the body of the :class:`Request`.
:param \*\*kwargs: Optional arguments that ``request`` takes.
:rtype: requests.Response
"""
return await self.request('PATCH', url, data=data, **kwargs)
async def delete(self, url, **kwargs):
r"""Sends a DELETE request. Returns :class:`Response` object.
:param url: URL for the new :class:`Request` object.
:param \*\*kwargs: Optional arguments that ``request`` takes.
:rtype: requests.Response
"""
return await self.request('DELETE', url, **kwargs)
async def request(
self,
method,
url,
params=None,
data=None,
headers=None,
cookies=None,
files=None,
auth=None,
timeout=None,
allow_redirects=True,
proxies=None,
hooks=None,
stream=None,
verify=None,
cert=None,
json=None,
):
"""Constructs a :class:`Request <Request>`, prepares it, and sends it.
Returns :class:`Response <Response>` object.
:param method: method for the new :class:`Request` object.
:param url: URL for the new :class:`Request` object.
:param params: (optional) Dictionary or bytes to be sent in the query
string for the :class:`Request`.
:param data: (optional) Dictionary, bytes, or file-like object to send
in the body of the :class:`Request`.
:param json: (optional) json to send in the body of the
:class:`Request`.
:param headers: (optional) Dictionary of HTTP Headers to send with the
:class:`Request`.
:param cookies: (optional) Dict or CookieJar object to send with the
:class:`Request`.
:param files: (optional) Dictionary of ``'filename': file-like-objects``
for multipart encoding upload.
:param auth: (optional) Auth tuple or callable to enable
Basic/Digest/Custom HTTP Auth.
:param timeout: (optional) How long to wait for the server to send
data before giving up, as a float, or a :ref:`(connect timeout,
read timeout) <timeouts>` tuple.
:type timeout: float or tuple
:param allow_redirects: (optional) Set to True by default.
:type allow_redirects: bool
:param proxies: (optional) Dictionary mapping protocol or protocol and
hostname to the URL of the proxy.
:param stream: (optional) whether to immediately download the response
content. Defaults to ``False``.
:param verify: (optional) Either a boolean, in which case it controls whether we verify
the server's TLS certificate, or a string, in which case it must be a path
to a CA bundle to use. Defaults to ``True``.
:param cert: (optional) if String, path to ssl client cert file (.pem).
If Tuple, ('cert', 'key') pair.
:rtype: requests.Response
"""
# Create the Request.
req = Request(
method=method.upper(),
url=url,
headers=headers,
files=files,
data=data or {},
json=json,
params=params or {},
auth=auth,
cookies=cookies,
hooks=hooks,
)
prep = self.prepare_request(req)
proxies = proxies or {}
settings = self.merge_environment_settings(
prep.url, proxies, stream, verify, cert
)
# Send the request.
send_kwargs = {'timeout': timeout, 'allow_redirects': allow_redirects}
send_kwargs.update(settings)
resp = await self.send(prep, **send_kwargs)
return resp
async def send(self, request, **kwargs):
"""Send a given PreparedRequest.
:rtype: requests.Response
"""
# Set defaults that the hooks can utilize to ensure they always have
# the correct parameters to reproduce the previous request.
kwargs.setdefault('stream', self.stream)
kwargs.setdefault('verify', self.verify)
kwargs.setdefault('cert', self.cert)
kwargs.setdefault('proxies', self.proxies)
# It's possible that users might accidentally send a Request object.
# Guard against that specific failure case.
if isinstance(request, Request):
raise ValueError('You can only send PreparedRequests.')
# Set up variables needed for resolve_redirects and dispatching of
# hooks
allow_redirects = kwargs.pop('allow_redirects', True)
stream = kwargs.get('stream')
hooks = request.hooks
# Get the appropriate adapter to use
adapter = self.get_adapter(url=request.url)
# Start time (approximately) of the request
start = preferred_clock()
# Send the request
r = await adapter.send(request, **kwargs)
# Total elapsed time of the request (approximately)
elapsed = preferred_clock() - start
r.elapsed = timedelta(seconds=elapsed)
# Response manipulation hooks.
r = dispatch_hook('response', hooks, r, **kwargs)
# Persist cookies
if r.history:
# If the hooks create history then we want those cookies too
for resp in r.history:
extract_cookies_to_jar(self.cookies, resp.request, resp.raw)
extract_cookies_to_jar(self.cookies, request, r.raw)
# Redirect resolving generator.
gen = self.resolve_redirects(r, request, **kwargs)
# Resolve redirects, if allowed.
history = [resp for resp in gen] if allow_redirects else []
# If there is a history, replace ``r`` with the last response
if history:
r = history.pop()
# If redirects aren't being followed, store the response on the Request for Response.next().
if not allow_redirects:
try:
r._next = next(
self.resolve_redirects(
r, request, yield_requests=True, **kwargs
)
)
except StopIteration:
pass
if not stream:
r.content
return r