mirror of
https://github.com/kennethreitz/requests-html.git
synced 2026-06-05 14:50:20 +00:00
725 lines
25 KiB
Python
725 lines
25 KiB
Python
import sys
|
||
import asyncio
|
||
from urllib.parse import urlparse, urlunparse, urljoin
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
from concurrent.futures._base import TimeoutError
|
||
from functools import partial
|
||
from typing import Set, Union, List, MutableMapping, Optional
|
||
|
||
import pyppeteer
|
||
import requests
|
||
from pyquery import PyQuery
|
||
|
||
from fake_useragent import UserAgent
|
||
from lxml.html.clean import Cleaner
|
||
import lxml
|
||
from lxml import etree
|
||
from lxml.html import HtmlElement
|
||
from lxml.html import tostring as lxml_html_tostring
|
||
from lxml.html.soupparser import fromstring as soup_parse
|
||
from parse import search as parse_search
|
||
from parse import findall, Result
|
||
from w3lib.encoding import html_to_unicode
|
||
|
||
DEFAULT_ENCODING = 'utf-8'
|
||
DEFAULT_URL = 'https://example.org/'
|
||
DEFAULT_USER_AGENT = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_6) AppleWebKit/603.3.8 (KHTML, like Gecko) Version/10.1.2 Safari/603.3.8'
|
||
DEFAULT_NEXT_SYMBOL = ['next', 'more', 'older']
|
||
|
||
cleaner = Cleaner()
|
||
cleaner.javascript = True
|
||
cleaner.style = True
|
||
|
||
useragent = None
|
||
|
||
# Typing.
|
||
_Find = Union[List['Element'], 'Element']
|
||
_XPath = Union[List[str], List['Element'], str, 'Element']
|
||
_Result = Union[List['Result'], 'Result']
|
||
_HTML = Union[str, bytes]
|
||
_BaseHTML = str
|
||
_UserAgent = str
|
||
_DefaultEncoding = str
|
||
_URL = str
|
||
_RawHTML = bytes
|
||
_Encoding = str
|
||
_LXML = HtmlElement
|
||
_Text = str
|
||
_Search = Result
|
||
_Containing = Union[str, List[str]]
|
||
_Links = Set[str]
|
||
_Attrs = MutableMapping
|
||
_Next = Union['HTML', List[str]]
|
||
_NextSymbol = List[str]
|
||
|
||
# Sanity checking.
|
||
try:
|
||
assert sys.version_info.major == 3
|
||
assert sys.version_info.minor > 5
|
||
except AssertionError:
|
||
raise RuntimeError('Requests-HTML requires Python 3.6+!')
|
||
|
||
|
||
class MaxRetries(Exception):
|
||
|
||
def __init__(self, message):
|
||
self.message = message
|
||
|
||
|
||
class BaseParser:
|
||
"""A basic HTML/Element Parser, for Humans.
|
||
|
||
:param element: The element from which to base the parsing upon.
|
||
:param default_encoding: Which encoding to default to.
|
||
:param html: HTML from which to base the parsing upon (optional).
|
||
:param url: The URL from which the HTML originated, used for ``absolute_links``.
|
||
|
||
"""
|
||
|
||
def __init__(self, *, element, default_encoding: _DefaultEncoding = None, html: _HTML = None, url: _URL) -> None:
|
||
self.element = element
|
||
self.url = url
|
||
self.skip_anchors = True
|
||
self.default_encoding = default_encoding
|
||
self._encoding = None
|
||
self._html = html.encode(DEFAULT_ENCODING) if isinstance(html, str) else html
|
||
self._lxml = None
|
||
self._pq = None
|
||
|
||
@property
|
||
def raw_html(self) -> _RawHTML:
|
||
"""Bytes representation of the HTML content.
|
||
(`learn more <http://www.diveintopython3.net/strings.html>`_).
|
||
"""
|
||
if self._html:
|
||
return self._html
|
||
else:
|
||
return etree.tostring(self.element, encoding='unicode').strip().encode(self.encoding)
|
||
|
||
@property
|
||
def html(self) -> _BaseHTML:
|
||
"""Unicode representation of the HTML content
|
||
(`learn more <http://www.diveintopython3.net/strings.html>`_).
|
||
"""
|
||
if self._html:
|
||
return self.raw_html.decode(self.encoding, errors='replace')
|
||
else:
|
||
return etree.tostring(self.element, encoding='unicode').strip()
|
||
|
||
@html.setter
|
||
def html(self, html: str) -> None:
|
||
self._html = html.encode(self.encoding)
|
||
|
||
@raw_html.setter
|
||
def raw_html(self, html: bytes) -> None:
|
||
"""Property setter for self.html."""
|
||
self._html = html
|
||
|
||
@property
|
||
def encoding(self) -> _Encoding:
|
||
"""The encoding string to be used, extracted from the HTML and
|
||
:class:`HTMLResponse <HTMLResponse>` headers.
|
||
"""
|
||
if self._encoding:
|
||
return self._encoding
|
||
|
||
# Scan meta tags for charset.
|
||
if self._html:
|
||
self._encoding = html_to_unicode(self.default_encoding, self._html)[0]
|
||
# Fall back to requests' detected encoding if decode fails.
|
||
try:
|
||
self.raw_html.decode(self.encoding, errors='replace')
|
||
except UnicodeDecodeError:
|
||
self._encoding = self.default_encoding
|
||
|
||
|
||
return self._encoding if self._encoding else self.default_encoding
|
||
|
||
@encoding.setter
|
||
def encoding(self, enc: str) -> None:
|
||
"""Property setter for self.encoding."""
|
||
self._encoding = enc
|
||
|
||
@property
|
||
def pq(self) -> PyQuery:
|
||
"""`PyQuery <https://pythonhosted.org/pyquery/>`_ representation
|
||
of the :class:`Element <Element>` or :class:`HTML <HTML>`.
|
||
"""
|
||
if self._pq is None:
|
||
self._pq = PyQuery(self.lxml)
|
||
|
||
return self._pq
|
||
|
||
@property
|
||
def lxml(self) -> HtmlElement:
|
||
"""`lxml <http://lxml.de>`_ representation of the
|
||
:class:`Element <Element>` or :class:`HTML <HTML>`.
|
||
"""
|
||
if self._lxml is None:
|
||
try:
|
||
self._lxml = soup_parse(self.html, features='html.parser')
|
||
except ValueError:
|
||
self._lxml = lxml.html.fromstring(self.raw_html)
|
||
|
||
return self._lxml
|
||
|
||
@property
|
||
def text(self) -> _Text:
|
||
"""The text content of the
|
||
:class:`Element <Element>` or :class:`HTML <HTML>`.
|
||
"""
|
||
return self.pq.text()
|
||
|
||
@property
|
||
def full_text(self) -> _Text:
|
||
"""The full text content (including links) of the
|
||
:class:`Element <Element>` or :class:`HTML <HTML>`.
|
||
"""
|
||
return self.lxml.text_content()
|
||
|
||
def find(self, selector: str = "*", *, containing: _Containing = None, clean: bool = False, first: bool = False, _encoding: str = None) -> _Find:
|
||
"""Given a CSS Selector, returns a list of
|
||
:class:`Element <Element>` objects or a single one.
|
||
|
||
:param selector: CSS Selector to use.
|
||
:param clean: Whether or not to sanitize the found HTML of ``<script>`` and ``<style>`` tags.
|
||
:param containing: If specified, only return elements that contain the provided text.
|
||
:param first: Whether or not to return just the first result.
|
||
:param _encoding: The encoding format.
|
||
|
||
Example CSS Selectors:
|
||
|
||
- ``a``
|
||
- ``a.someClass``
|
||
- ``a#someID``
|
||
- ``a[target=_blank]``
|
||
|
||
See W3School's `CSS Selectors Reference
|
||
<https://www.w3schools.com/cssref/css_selectors.asp>`_
|
||
for more details.
|
||
|
||
If ``first`` is ``True``, only returns the first
|
||
:class:`Element <Element>` found.
|
||
"""
|
||
|
||
# Convert a single containing into a list.
|
||
if isinstance(containing, str):
|
||
containing = [containing]
|
||
|
||
encoding = _encoding or self.encoding
|
||
elements = [
|
||
Element(element=found, url=self.url, default_encoding=encoding)
|
||
for found in self.pq(selector)
|
||
]
|
||
|
||
if containing:
|
||
elements_copy = elements.copy()
|
||
elements = []
|
||
|
||
for element in elements_copy:
|
||
if any([c.lower() in element.full_text.lower() for c in containing]):
|
||
elements.append(element)
|
||
|
||
elements.reverse()
|
||
|
||
# Sanitize the found HTML.
|
||
if clean:
|
||
elements_copy = elements.copy()
|
||
elements = []
|
||
|
||
for element in elements_copy:
|
||
element.raw_html = lxml_html_tostring(cleaner.clean_html(element.lxml))
|
||
elements.append(element)
|
||
|
||
return _get_first_or_list(elements, first)
|
||
|
||
def xpath(self, selector: str, *, clean: bool = False, first: bool = False, _encoding: str = None) -> _XPath:
|
||
"""Given an XPath selector, returns a list of
|
||
:class:`Element <Element>` objects or a single one.
|
||
|
||
:param selector: XPath Selector to use.
|
||
:param clean: Whether or not to sanitize the found HTML of ``<script>`` and ``<style>`` tags.
|
||
:param first: Whether or not to return just the first result.
|
||
:param _encoding: The encoding format.
|
||
|
||
If a sub-selector is specified (e.g. ``//a/@href``), a simple
|
||
list of results is returned.
|
||
|
||
See W3School's `XPath Examples
|
||
<https://www.w3schools.com/xml/xpath_examples.asp>`_
|
||
for more details.
|
||
|
||
If ``first`` is ``True``, only returns the first
|
||
:class:`Element <Element>` found.
|
||
"""
|
||
selected = self.lxml.xpath(selector)
|
||
|
||
elements = [
|
||
Element(element=selection, url=self.url, default_encoding=_encoding or self.encoding)
|
||
if not isinstance(selection, etree._ElementUnicodeResult) else str(selection)
|
||
for selection in selected
|
||
]
|
||
|
||
# Sanitize the found HTML.
|
||
if clean:
|
||
elements_copy = elements.copy()
|
||
elements = []
|
||
|
||
for element in elements_copy:
|
||
element.raw_html = lxml_html_tostring(cleaner.clean_html(element.lxml))
|
||
elements.append(element)
|
||
|
||
return _get_first_or_list(elements, first)
|
||
|
||
def search(self, template: str) -> Result:
|
||
"""Search the :class:`Element <Element>` for the given Parse template.
|
||
|
||
:param template: The Parse template to use.
|
||
"""
|
||
|
||
return parse_search(template, self.html)
|
||
|
||
def search_all(self, template: str) -> _Result:
|
||
"""Search the :class:`Element <Element>` (multiple times) for the given parse
|
||
template.
|
||
|
||
:param template: The Parse template to use.
|
||
"""
|
||
return [r for r in findall(template, self.html)]
|
||
|
||
@property
|
||
def links(self) -> _Links:
|
||
"""All found links on page, in as–is form."""
|
||
|
||
def gen():
|
||
for link in self.find('a'):
|
||
|
||
try:
|
||
href = link.attrs['href'].strip()
|
||
if href and not (href.startswith('#') and self.skip_anchors) and not href.startswith(('javascript:', 'mailto:')):
|
||
yield href
|
||
except KeyError:
|
||
pass
|
||
|
||
return set(gen())
|
||
|
||
def _make_absolute(self, link):
|
||
"""Makes a given link absolute."""
|
||
|
||
# Parse the link with stdlib.
|
||
parsed = urlparse(link)._asdict()
|
||
|
||
# If link is relative, then join it with base_url.
|
||
if not parsed['netloc']:
|
||
return urljoin(self.base_url, link)
|
||
|
||
# Link is absolute; if it lacks a scheme, add one from base_url.
|
||
if not parsed['scheme']:
|
||
parsed['scheme'] = urlparse(self.base_url).scheme
|
||
|
||
# Reconstruct the URL to incorporate the new scheme.
|
||
parsed = (v for v in parsed.values())
|
||
return urlunparse(parsed)
|
||
|
||
# Link is absolute and complete with scheme; nothing to be done here.
|
||
return link
|
||
|
||
|
||
@property
|
||
def absolute_links(self) -> _Links:
|
||
"""All found links on page, in absolute form
|
||
(`learn more <https://www.navegabem.com/absolute-or-relative-links.html>`_).
|
||
"""
|
||
|
||
def gen():
|
||
for link in self.links:
|
||
yield self._make_absolute(link)
|
||
|
||
return set(gen())
|
||
|
||
@property
|
||
def base_url(self) -> _URL:
|
||
"""The base URL for the page. Supports the ``<base>`` tag
|
||
(`learn more <https://www.w3schools.com/tags/tag_base.asp>`_)."""
|
||
|
||
# Support for <base> tag.
|
||
base = self.find('base', first=True)
|
||
if base:
|
||
result = base.attrs.get('href', '').strip()
|
||
if result:
|
||
return result
|
||
|
||
# Parse the url to separate out the path
|
||
parsed = urlparse(self.url)._asdict()
|
||
|
||
# Remove any part of the path after the last '/'
|
||
parsed['path'] = '/'.join(parsed['path'].split('/')[:-1]) + '/'
|
||
|
||
# Reconstruct the url with the modified path
|
||
parsed = (v for v in parsed.values())
|
||
url = urlunparse(parsed)
|
||
|
||
return url
|
||
|
||
|
||
class Element(BaseParser):
|
||
"""An element of HTML.
|
||
|
||
:param element: The element from which to base the parsing upon.
|
||
:param url: The URL from which the HTML originated, used for ``absolute_links``.
|
||
:param default_encoding: Which encoding to default to.
|
||
"""
|
||
|
||
__slots__ = [
|
||
'element', 'url', 'skip_anchors', 'default_encoding', '_encoding',
|
||
'_html', '_lxml', '_pq', '_attrs', 'session'
|
||
]
|
||
|
||
def __init__(self, *, element, url: _URL, default_encoding: _DefaultEncoding = None) -> None:
|
||
super(Element, self).__init__(element=element, url=url, default_encoding=default_encoding)
|
||
self.element = element
|
||
self.tag = element.tag
|
||
self.lineno = element.sourceline
|
||
self._attrs = None
|
||
|
||
def __repr__(self) -> str:
|
||
attrs = ['{}={}'.format(attr, repr(self.attrs[attr])) for attr in self.attrs]
|
||
return "<Element {} {}>".format(repr(self.element.tag), ' '.join(attrs))
|
||
|
||
@property
|
||
def attrs(self) -> _Attrs:
|
||
"""Returns a dictionary of the attributes of the :class:`Element <Element>`
|
||
(`learn more <https://www.w3schools.com/tags/ref_attributes.asp>`_).
|
||
"""
|
||
if self._attrs is None:
|
||
self._attrs = {k: v for k, v in self.element.items()}
|
||
|
||
# Split class and rel up, as there are ussually many of them:
|
||
for attr in ['class', 'rel']:
|
||
if attr in self._attrs:
|
||
self._attrs[attr] = tuple(self._attrs[attr].split())
|
||
|
||
return self._attrs
|
||
|
||
|
||
class HTML(BaseParser):
|
||
"""An HTML document, ready for parsing.
|
||
|
||
:param url: The URL from which the HTML originated, used for ``absolute_links``.
|
||
:param html: HTML from which to base the parsing upon (optional).
|
||
:param default_encoding: Which encoding to default to.
|
||
"""
|
||
|
||
def __init__(self, *, session: Union['HTMLSession', 'AsyncHTMLSession'] = None, url: str = DEFAULT_URL, html: _HTML, default_encoding: str = DEFAULT_ENCODING) -> None:
|
||
|
||
# Convert incoming unicode HTML into bytes.
|
||
if isinstance(html, str):
|
||
html = html.encode(DEFAULT_ENCODING)
|
||
|
||
super(HTML, self).__init__(
|
||
# Convert unicode HTML to bytes.
|
||
element=PyQuery(html)('html') or PyQuery(f'<html>{html}</html>')('html'),
|
||
html=html,
|
||
url=url,
|
||
default_encoding=default_encoding
|
||
)
|
||
self.session = session or HTMLSession()
|
||
self.page = None
|
||
self.next_symbol = DEFAULT_NEXT_SYMBOL
|
||
|
||
def __repr__(self) -> str:
|
||
return f"<HTML url={self.url!r}>"
|
||
|
||
def _next(self, fetch: bool = False, next_symbol: _NextSymbol = DEFAULT_NEXT_SYMBOL) -> _Next:
|
||
"""Attempts to find the next page, if there is one. If ``fetch``
|
||
is ``True`` (default), returns :class:`HTML <HTML>` object of
|
||
next page. If ``fetch`` is ``False``, simply returns the next URL.
|
||
|
||
"""
|
||
|
||
def get_next():
|
||
candidates = self.find('a', containing=next_symbol)
|
||
|
||
for candidate in candidates:
|
||
if candidate.attrs.get('href'):
|
||
# Support 'next' rel (e.g. reddit).
|
||
if 'next' in candidate.attrs.get('rel', []):
|
||
return candidate.attrs['href']
|
||
|
||
# Support 'next' in classnames.
|
||
for _class in candidate.attrs.get('class', []):
|
||
if 'next' in _class:
|
||
return candidate.attrs['href']
|
||
|
||
if 'page' in candidate.attrs['href']:
|
||
return candidate.attrs['href']
|
||
|
||
try:
|
||
# Resort to the last candidate.
|
||
return candidates[-1].attrs['href']
|
||
except IndexError:
|
||
return None
|
||
|
||
__next = get_next()
|
||
if __next:
|
||
url = self._make_absolute(__next)
|
||
else:
|
||
return None
|
||
|
||
if fetch:
|
||
return self.session.get(url)
|
||
else:
|
||
return url
|
||
|
||
def __iter__(self):
|
||
|
||
next = self
|
||
|
||
while True:
|
||
yield next
|
||
try:
|
||
next = next._next(fetch=True, next_symbol=self.next_symbol).html
|
||
except AttributeError:
|
||
break
|
||
|
||
def __next__(self):
|
||
return self._next(fetch=True, next_symbol=self.next_symbol).html
|
||
|
||
def add_next_symbol(self, next_symbol):
|
||
self.next_symbol.append(next_symbol)
|
||
|
||
def render(self, retries: int = 8, script: str = None, wait: float = 0.2, scrolldown=False, sleep: int = 0, reload: bool = True, timeout: Union[float, int] = 8.0, keep_page: bool = False):
|
||
"""Reloads the response in Chromium, and replaces HTML content
|
||
with an updated version, with JavaScript executed.
|
||
|
||
:param retries: The number of times to retry loading the page in Chromium.
|
||
:param script: JavaScript to execute upon page load (optional).
|
||
:param wait: The number of seconds to wait before loading the page, preventing timeouts (optional).
|
||
:param scrolldown: Integer, if provided, of how many times to page down.
|
||
:param sleep: Integer, if provided, of how many long to sleep after initial render.
|
||
:param reload: If ``False``, content will not be loaded from the browser, but will be provided from memory.
|
||
:param keep_page: If ``True`` will allow you to interact with the browser page through ``r.html.page``.
|
||
|
||
If ``scrolldown`` is specified, the page will scrolldown the specified
|
||
number of times, after sleeping the specified amount of time
|
||
(e.g. ``scrolldown=10, sleep=1``).
|
||
|
||
If just ``sleep`` is provided, the rendering will wait *n* seconds, before
|
||
returning.
|
||
|
||
If ``script`` is specified, it will execute the provided JavaScript at
|
||
runtime. Example:
|
||
|
||
.. code-block:: python
|
||
|
||
script = \"\"\"
|
||
() => {
|
||
return {
|
||
width: document.documentElement.clientWidth,
|
||
height: document.documentElement.clientHeight,
|
||
deviceScaleFactor: window.devicePixelRatio,
|
||
}
|
||
}
|
||
\"\"\"
|
||
|
||
Returns the return value of the executed ``script``, if any is provided:
|
||
|
||
.. code-block:: python
|
||
|
||
>>> r.html.render(script=script)
|
||
{'width': 800, 'height': 600, 'deviceScaleFactor': 1}
|
||
|
||
Warning: the first time you run this method, it will download
|
||
Chromium into your home directory (``~/.pyppeteer``).
|
||
"""
|
||
async def _async_render(*, url: str, script: str = None, scrolldown, sleep: int, wait: float, reload, content: Optional[str], timeout: Union[float, int], keep_page: bool):
|
||
try:
|
||
page = await self.session.browser.newPage()
|
||
|
||
# Wait before rendering the page, to prevent timeouts.
|
||
await asyncio.sleep(wait)
|
||
|
||
# Load the given page (GET request, obviously.)
|
||
if reload:
|
||
await page.goto(url, options={'timeout': int(timeout * 1000)})
|
||
else:
|
||
await page.goto(f'data:text/html,{self.html}', options={'timeout': int(timeout * 1000)})
|
||
|
||
result = None
|
||
if script:
|
||
result = await page.evaluate(script)
|
||
|
||
if scrolldown:
|
||
for _ in range(scrolldown):
|
||
await page._keyboard.down('PageDown')
|
||
await asyncio.sleep(sleep)
|
||
else:
|
||
await asyncio.sleep(sleep)
|
||
|
||
if scrolldown:
|
||
await page._keyboard.up('PageDown')
|
||
|
||
# Return the content of the page, JavaScript evaluated.
|
||
content = await page.content()
|
||
if not keep_page:
|
||
await page.close()
|
||
page = None
|
||
return content, result, page
|
||
except TimeoutError:
|
||
await page.close()
|
||
page = None
|
||
return None
|
||
|
||
self.session.browser # Automatically create a event loop and browser
|
||
content = None
|
||
|
||
# Automatically set Reload to False, if example URL is being used.
|
||
if self.url == DEFAULT_URL:
|
||
reload = False
|
||
|
||
for _ in range(retries):
|
||
if not content:
|
||
try:
|
||
|
||
content, result, page = self.session.loop.run_until_complete(_async_render(url=self.url, script=script, sleep=sleep, wait=wait, content=self.html, reload=reload, scrolldown=scrolldown, timeout=timeout, keep_page=keep_page))
|
||
except TypeError:
|
||
pass
|
||
else:
|
||
break
|
||
|
||
if not content:
|
||
raise MaxRetries("Unable to render the page. Try increasing timeout")
|
||
|
||
html = HTML(url=self.url, html=content.encode(DEFAULT_ENCODING), default_encoding=DEFAULT_ENCODING)
|
||
self.__dict__.update(html.__dict__)
|
||
self.page = page
|
||
return result
|
||
|
||
|
||
class HTMLResponse(requests.Response):
|
||
"""An HTML-enabled :class:`requests.Response <requests.Response>` object.
|
||
Effectively the same, but with an intelligent ``.html`` property added.
|
||
"""
|
||
|
||
def __init__(self, session: Union['HTMLSession', 'AsyncHTMLSession']) -> None:
|
||
super(HTMLResponse, self).__init__()
|
||
self._html = None # type: HTML
|
||
self.session = session
|
||
|
||
@property
|
||
def html(self) -> HTML:
|
||
if not self._html:
|
||
self._html = HTML(session=self.session, url=self.url, html=self.content, default_encoding=self.encoding)
|
||
|
||
return self._html
|
||
|
||
@classmethod
|
||
def _from_response(cls, response, session: Union['HTMLSession', 'AsyncHTMLSession']):
|
||
html_r = cls(session=session)
|
||
html_r.__dict__.update(response.__dict__)
|
||
return html_r
|
||
|
||
|
||
def user_agent(style=None) -> _UserAgent:
|
||
"""Returns an apparently legit user-agent, if not requested one of a specific
|
||
style. Defaults to a Chrome-style User-Agent.
|
||
"""
|
||
global useragent
|
||
if (not useragent) and style:
|
||
useragent = UserAgent()
|
||
|
||
return useragent[style] if style else DEFAULT_USER_AGENT
|
||
|
||
|
||
def _get_first_or_list(l, first=False):
|
||
if first:
|
||
try:
|
||
return l[0]
|
||
except IndexError:
|
||
return None
|
||
else:
|
||
return l
|
||
|
||
|
||
class HTMLSession(requests.Session):
|
||
"""A consumable session, for cookie persistence and connection pooling,
|
||
amongst other things.
|
||
"""
|
||
|
||
def __init__(self, mock_browser=True, verify=False):
|
||
super(HTMLSession, self).__init__()
|
||
|
||
# Mock a web browser's user agent.
|
||
if mock_browser:
|
||
self.headers['User-Agent'] = user_agent()
|
||
|
||
self.hooks = {'response': self._handle_response}
|
||
self.verify = verify
|
||
|
||
self.__browser_args = browser_argsverify
|
||
|
||
@staticmethod
|
||
def _handle_response(response, **kwargs) -> HTMLResponse:
|
||
"""Requests HTTP Response handler. Attaches .html property to
|
||
class:`requests.Response <requests.Response>` objects.
|
||
"""
|
||
if not response.encoding:
|
||
response.encoding = DEFAULT_ENCODING
|
||
|
||
return response
|
||
|
||
def request(self, *args, **kwargs) -> HTMLResponse:
|
||
"""Makes an HTTP Request, with mocked User–Agent headers.
|
||
Returns a class:`HTTPResponse <HTTPResponse>`.
|
||
"""
|
||
# Convert Request object into HTTPRequest object.
|
||
r = super(HTMLSession, self).request(*args, **kwargs)
|
||
|
||
return HTMLResponse._from_response(r, self)
|
||
|
||
@property
|
||
def browser(self):
|
||
if not hasattr(self, "_browser"):
|
||
self.loop = asyncio.get_event_loop()
|
||
self._browser = self.loop.run_until_complete(pyppeteer.launch(ignoreHTTPSErrors=not(self.verify), headless=True, args=['--no-sandbox']))
|
||
return self._browser
|
||
|
||
def close(self):
|
||
""" If a browser was created close it first. """
|
||
if hasattr(self, "_browser"):
|
||
self.loop.run_until_complete(self._browser.close())
|
||
super().close()
|
||
|
||
|
||
class AsyncHTMLSession(requests.Session):
|
||
""" An async consumable session. """
|
||
|
||
def __init__(self, loop=None, workers=None,
|
||
mock_browser: bool = True, *args, **kwargs):
|
||
""" Set or create an event loop and a thread pool.
|
||
|
||
:param loop: Asyncio loop to use.
|
||
:param workers: Amount of threads to use for executing async calls.
|
||
If not pass it will default to the number of processors on the
|
||
machine, multiplied by 5. """
|
||
super().__init__(*args, **kwargs)
|
||
|
||
# Mock a web browser's user agent.
|
||
if mock_browser:
|
||
self.headers['User-Agent'] = user_agent()
|
||
|
||
self.hooks['response'].append(self.response_hook)
|
||
|
||
self.loop = loop or asyncio.get_event_loop()
|
||
self.thread_pool = ThreadPoolExecutor(max_workers=workers)
|
||
|
||
def response_hook(self, response, **kwargs) -> HTMLResponse:
|
||
""" Change response enconding and replace it by a HTMLResponse. """
|
||
response.encoding = DEFAULT_ENCODING
|
||
return HTMLResponse._from_response(response, self)
|
||
|
||
def request(self, *args, **kwargs):
|
||
""" Partial original request func and run it in a thread. """
|
||
func = partial(super().request, *args, **kwargs)
|
||
return self.loop.run_in_executor(self.thread_pool, func)
|