From cf4f3a092e6b70002fa578e78c6b68cb0379fd16 Mon Sep 17 00:00:00 2001 From: Matt Davis Date: Tue, 16 May 2023 00:58:36 -0400 Subject: [PATCH] Explore utilizing hashes from URL fragments when provided. --- pipenv/utils/resolver.py | 66 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 62 insertions(+), 4 deletions(-) diff --git a/pipenv/utils/resolver.py b/pipenv/utils/resolver.py index ca480fa5..63718a35 100644 --- a/pipenv/utils/resolver.py +++ b/pipenv/utils/resolver.py @@ -7,8 +7,10 @@ import sys import tempfile import warnings from functools import lru_cache +from html.parser import HTMLParser from pathlib import Path from typing import Dict, List, Optional, Set, Tuple, Union +from urllib import parse from pipenv import environments from pipenv.exceptions import RequirementError, ResolutionFailure @@ -123,6 +125,20 @@ class HashCacheMixin: return ":".join([h.name, h.hexdigest()]) +class PackageIndexHTMLParser(HTMLParser): + def __init__(self): + super().__init__() + self.urls = [] + + def handle_starttag(self, tag, attrs): + # If tag is an anchor + if tag == "a": + # find href attribute + for attr in attrs: + if attr[0] == "href": + self.urls.append(attr[1]) + + class Resolver: def __init__( self, @@ -758,6 +774,42 @@ class Resolver: ) return None + def _get_hashes_from_remote_index_urls(self, ireq, source): + pkg_url = f"{source['url']}/{ireq.name}/" + session = _get_requests_session(self.project.s.PIPENV_MAX_RETRIES) + try: + collected_hashes = set() + # Grab the hashes from the new warehouse API. + response = session.get(pkg_url, timeout=10) + # Create an instance of the parser + parser = PackageIndexHTMLParser() + # Feed the HTML to the parser + parser.feed(response.text) + # Extract hrefs + hrefs = parser.urls + + version = "" + if ireq.specifier: + spec = next(iter(s for s in ireq.specifier), None) + if spec: + version = spec.version + for package_url in hrefs: + if version in package_url: + url_params = parse.urlparse(package_url).fragment + params_dict = parse.parse_qs(url_params) + if params_dict.get(FAVORITE_HASH): + collected_hashes.add(params_dict[FAVORITE_HASH][0]) + return self.prepend_hash_types(collected_hashes, FAVORITE_HASH) + except (ValueError, KeyError, ConnectionError): + if self.project.s.is_verbose(): + click.echo( + "{}: Error generating hash for {}".format( + click.style("Warning", bold=True, fg="red"), ireq.name + ), + err=True, + ) + return None + def collect_hashes(self, ireq): link = ireq.link # Handle VCS and file links first if link and (link.is_vcs or (link.is_file and link.is_existing_dir())): @@ -773,10 +825,16 @@ class Resolver: sources = list( filter(lambda s: s.get("name") == self.index_lookup[ireq.name], sources) ) - if any(is_pypi_url(source["url"]) for source in sources): - hashes = self._get_hashes_from_pypi(ireq) - if hashes: - return hashes + source = sources[0] if len(sources) else None + if source: + if is_pypi_url(source["url"]): + hashes = self._get_hashes_from_pypi(ireq) + if hashes: + return hashes + else: + hashes = self._get_hashes_from_remote_index_urls(ireq, source) + if hashes: + return hashes applicable_candidates = self.ignore_compatibility_finder.find_best_candidate( ireq.name, ireq.specifier