diff --git a/requests/adapters.py b/requests/adapters.py index 0c037227..cf23f05f 100644 --- a/requests/adapters.py +++ b/requests/adapters.py @@ -11,7 +11,9 @@ import os.path import socket import requests_core +from requests_core.http_manager._backends import TrioBackend from requests_core.http_manager.poolmanager import PoolManager, proxy_from_url +from requests_core.http_manager._async.poolmanager import PoolManager as AsyncPoolManager from requests_core.http_manager.response import HTTPResponse from requests_core.http_manager.util import Timeout as TimeoutSauce from requests_core.http_manager.util.retry import Retry @@ -418,7 +420,7 @@ class HTTPAdapter(BaseAdapter): ) return headers - def send( + async def send( self, request, stream=False, @@ -563,13 +565,109 @@ class HTTPAdapter(BaseAdapter): else: raise - return self.build_response(request, resp) + return await self.build_response(request, resp) + class AsyncHTTPAdapter(HTTPAdapter): """docstring for AsyncHTTPAdapter""" - def __init__(self, *args, **kwargs): + def __init__(self, backend=None, *args, **kwargs): + self.backend = backend or TrioBackend() super(AsyncHTTPAdapter, self).__init__(*args, **kwargs) + async def build_response(self, req, resp): + """Builds a :class:`Response ` object from a urllib3 + response. This should not be called from user code, and is only exposed + for use when subclassing the + :class:`HTTPAdapter ` + + :param req: The :class:`PreparedRequest ` used to generate the response. + :param resp: The urllib3 response object. + :rtype: requests.Response + """ + response = Response() + # Fallback to None if there's no status_code, for whatever reason. + response.status_code = getattr(resp, 'status', None) + # Make headers case-insensitive. + response.headers = HTTPHeaderDict(getattr(resp, 'headers', {})) + # Set encoding. + response.encoding = get_encoding_from_headers(response.headers) + response.raw = resp + response.reason = response.raw.reason + if isinstance(req.url, bytes): + response.url = req.url.decode('utf-8') + else: + response.url = req.url + # Add new cookies from the server. + extract_cookies_to_jar(response.cookies, req, resp) + # Give the Response some context. + response.request = req + response.connection = self + return response + + def init_poolmanager( + self, connections, maxsize, block=DEFAULT_POOLBLOCK, **pool_kwargs + ): + """Initializes a urllib3 PoolManager. + + This method should not be called from user code, and is only + exposed for use when subclassing the + :class:`HTTPAdapter `. + + :param connections: The number of urllib3 connection pools to cache. + :param maxsize: The maximum number of connections to save in the pool. + :param block: Block when no free connections are available. + :param pool_kwargs: Extra keyword arguments used to initialize the Pool Manager. + """ + # save these values for pickling + self._pool_connections = connections + self._pool_maxsize = maxsize + self._pool_block = block + self.poolmanager = AsyncPoolManager( + num_pools=connections, + maxsize=maxsize, + block=block, + strict=True, + backend=self.backend, + **pool_kwargs, + ) + + def get_connection(self, url, proxies=None, verify=None, cert=None): + """Returns a urllib3 connection for the given URL. This should not be + called from user code, and is only exposed for use when subclassing the + :class:`HTTPAdapter `. + + :param url: The URL to connect to. + :param proxies: (optional) A Requests-style dictionary of proxies used on this request. + :rtype: urllib3.ConnectionPool + """ + pool_kwargs = _pool_kwargs(verify, cert) + proxy = select_proxy(url, proxies) + if proxy: + proxy = prepend_scheme_if_needed(proxy, 'http') + proxy_manager = self.proxy_manager_for(proxy) + conn = proxy_manager.connection_from_url( + url, pool_kwargs=pool_kwargs + ) + else: + # Only scheme should be lower case + parsed = urlparse(url) + url = parsed.geturl() + conn = self.poolmanager.connection_from_url( + url, pool_kwargs=pool_kwargs + ) + return conn + + def close(self): + """Disposes of any internal state. + + Currently, this closes the PoolManager and any active ProxyManager, + which closes any pooled connections. + """ + self.poolmanager.clear() + for proxy in self.proxy_manager.values(): + proxy.clear() + pass + async def send( self, request, @@ -595,6 +693,7 @@ class AsyncHTTPAdapter(HTTPAdapter): :rtype: requests.Response """ conn = self.get_connection(request.url, proxies, verify, cert) + url = self.request_url(request, proxies) self.add_headers(request) chunked = not ( @@ -633,6 +732,7 @@ class AsyncHTTPAdapter(HTTPAdapter): enforce_content_length=True, pool=conn ) + # Send the request. else: if hasattr(conn, 'proxy_pool'): @@ -658,7 +758,7 @@ class AsyncHTTPAdapter(HTTPAdapter): # Receive the response from the server try: # For Python 2.7, use buffering of HTTP responses - r = low_conn.getresponse(buffering=True) + r = alow_conn.getresponse(buffering=True) except TypeError: # For Python 3.3+ versions, this is the default r = low_conn.getresponse() @@ -715,4 +815,4 @@ class AsyncHTTPAdapter(HTTPAdapter): else: raise - return self.build_response(request, resp) + return await self.build_response(request, resp) diff --git a/requests/models.py b/requests/models.py index a2029b29..75843ed7 100644 --- a/requests/models.py +++ b/requests/models.py @@ -747,11 +747,11 @@ class Response(object): return self._next @property - def apparent_encoding(self): + async def apparent_encoding(self): """The apparent encoding, provided by the chardet library.""" - return chardet.detect(self.content)['encoding'] + return chardet.detect(await self.content)['encoding'] - def iter_content(self, chunk_size=1, decode_unicode=False): + async def iter_content(self, decode_unicode=False): """Iterates over the response data. When stream=True is set on the request, this avoids reading the content at once into memory for large responses. The chunk size is the number of bytes it should @@ -768,12 +768,15 @@ class Response(object): enumeration before invoking iter_content. """ - def generate(): + DEFAULT_CHUNK_SIZE = 1 + + async def generate(): # Special case for urllib3. if hasattr(self.raw, 'stream'): try: - for chunk in self.raw.stream( - chunk_size, # decode_content=True + async for chunk in self.raw.stream( + # chunk_size, decode_content=True + decode_content=True ): yield chunk @@ -793,7 +796,7 @@ class Response(object): else: # Standard file-like object. while True: - chunk = self.raw.read(chunk_size) + chunk = await self.raw.read(chunk_size) if not chunk: break @@ -804,15 +807,15 @@ class Response(object): if self._content_consumed and isinstance(self._content, bool): raise StreamConsumedError() - elif chunk_size is not None and not isinstance(chunk_size, int): - raise TypeError( - "chunk_size must be an int, it is instead a %s." % - type(chunk_size) - ) + # elif chunk_size is not None and not isinstance(chunk_size, int): + # raise TypeError( + # f"chunk_size must be an int, it is instead a {type(chunk_size)}." + # ) # simulate reading small chunks of the content - reused_chunks = iter_slices(self._content, chunk_size) - stream_chunks = generate() + reused_chunks = iter_slices(self._content, DEFAULT_CHUNK_SIZE) + stream_chunks = await generate().__anext__() + chunks = reused_chunks if self._content_consumed else stream_chunks if decode_unicode: if self.encoding is None: @@ -903,7 +906,7 @@ class Response(object): yield pending @property - def content(self): + async def content(self): """Content of the response, in bytes.""" if self._content is False: # Read the contents. @@ -915,17 +918,20 @@ class Response(object): if self.status_code == 0 or self.raw is None: self._content = None else: + # self._content = await self.iter_content(CONTENT_CHUNK_SIZE) + # print(bytes().join( + # [await self.iter_content(CONTENT_CHUNK_SIZE)] + # )) self._content = bytes().join( - self.iter_content(CONTENT_CHUNK_SIZE) - ) or bytes( - ) + [await self.iter_content()] + ) or bytes() self._content_consumed = True # don't need to release the connection; that's been handled by urllib3 # since we exhausted the data. return self._content @property - def text(self): + async def text(self): """Content of the response, in unicode. If Response.encoding is None, encoding will be guessed using @@ -939,7 +945,7 @@ class Response(object): # Try charset from content-type content = None encoding = self.encoding - if not self.content: + if not await self.content: return str('') # Fallback to auto-detected encoding. @@ -955,25 +961,26 @@ class Response(object): # A TypeError can be raised if encoding is None # # So we try blindly encoding. - content = str(self.content, errors='replace') + content = str(await self.content, errors='replace') return content - def json(self, **kwargs): + async def json(self, **kwargs): r"""Returns the json-encoded content of a response, if any. :param \*\*kwargs: Optional arguments that ``json.loads`` takes. :raises ValueError: If the response body does not contain valid json. """ - if not self.encoding and self.content and len(self.content) > 3: + if not self.encoding and await self.content and len(await self.content) > 3: # No encoding set. JSON RFC 4627 section 3 states we should expect # UTF-8, -16 or -32. Detect which one to use; If the detection or # decoding fails, fall back to `self.text` (using chardet to make # a best guess). - encoding = guess_json_utf(self.content) + encoding = guess_json_utf(await self.content) if encoding is not None: try: + content = await self.content return complexjson.loads( - self.content.decode(encoding), **kwargs + content.decode(encoding), **kwargs ) except UnicodeDecodeError: @@ -982,7 +989,7 @@ class Response(object): # and the server didn't bother to tell us what codec *was* # used. pass - return complexjson.loads(self.text, **kwargs) + return complexjson.loads(await self.text, **kwargs) @property def links(self): diff --git a/requests/sessions.py b/requests/sessions.py index fd4ee4ae..4fa2ca7d 100644 --- a/requests/sessions.py +++ b/requests/sessions.py @@ -752,8 +752,8 @@ class AsyncSession(Session): def __init__(self, backend=None): self.backend = backend or TrioBackend() super(AsyncSession, self).__init__() - self.mount('https://', AsyncHTTPAdapter()) - self.mount('http://', AsyncHTTPAdapter()) + self.mount('https://', AsyncHTTPAdapter(backend=self.backend)) + self.mount('http://', AsyncHTTPAdapter(backend=self.backend)) async def get(self, url, **kwargs): r"""Sends a GET request. Returns :class:`Response` object. @@ -960,5 +960,5 @@ class AsyncSession(Session): except StopIteration: pass if not stream: - r.content + await r.content return r