Handle more variations in private index html to improve hash collection (#5898)

* Handle more cases of hash collection

* add news fragment
This commit is contained in:
Matt Davis
2023-09-01 05:02:12 -04:00
committed by GitHub
parent 8caed47350
commit 56d1e1cd72
2 changed files with 41 additions and 11 deletions
+1
View File
@@ -0,0 +1 @@
Handle more variations in private index html to improve hash collection.
+40 -11
View File
@@ -292,17 +292,16 @@ class Project:
return None
def get_hashes_from_remote_index_urls(self, ireq, source):
pkg_url = f"{source['url']}/{ireq.name}/"
normalized_name = normalize_name(ireq.name)
url_name = normalized_name.replace(".", "-")
pkg_url = f"{source['url']}/{url_name}/"
session = self.get_requests_session_for_source(source)
try:
collected_hashes = set()
# Grab the hashes from the new warehouse API.
response = session.get(pkg_url, timeout=10)
# Create an instance of the parser
parser = PackageIndexHTMLParser()
# Feed the HTML to the parser
parser.feed(response.text)
# Extract hrefs
hrefs = parser.urls
version = ""
@@ -310,19 +309,45 @@ class Project:
spec = next(iter(s for s in ireq.specifier), None)
if spec:
version = spec.version
# We'll check if the href looks like a version-specific page (i.e., ends with '/')
for package_url in hrefs:
if version in parse.unquote(package_url):
parsed_url = parse.urlparse(package_url)
if version in parsed_url.path and parsed_url.path.endswith("/"):
# This might be a version-specific page. Fetch and parse it
version_url = urljoin(pkg_url, package_url)
version_response = session.get(version_url, timeout=10)
version_parser = PackageIndexHTMLParser()
version_parser.feed(version_response.text)
version_hrefs = version_parser.urls
# Process these new hrefs as potential wheels
for v_package_url in version_hrefs:
url_params = parse.urlparse(v_package_url).fragment
params_dict = parse.parse_qs(url_params)
if params_dict.get(FAVORITE_HASH):
collected_hashes.add(params_dict[FAVORITE_HASH][0])
else: # Fallback to downloading the file to obtain hash
v_package_full_url = urljoin(version_url, v_package_url)
link = Link(v_package_full_url)
file_hash = self.get_file_hash(session, link)
if file_hash:
collected_hashes.add(file_hash)
elif version in parse.unquote(package_url):
# Process the current href as a potential wheel from the main page
url_params = parse.urlparse(package_url).fragment
params_dict = parse.parse_qs(url_params)
if params_dict.get(FAVORITE_HASH):
collected_hashes.add(params_dict[FAVORITE_HASH][0])
else: # Fallback to downloading the file to obtain hash
package_url = urljoin(source["url"], package_url)
link = Link(package_url)
package_full_url = urljoin(pkg_url, package_url)
link = Link(package_full_url)
file_hash = self.get_file_hash(session, link)
if file_hash:
collected_hashes.add(file_hash)
return self.prepend_hash_types(collected_hashes, FAVORITE_HASH)
except (ValueError, KeyError, ConnectionError):
if self.s.is_verbose():
click.echo(
@@ -1198,8 +1223,12 @@ class Project:
return newly_added, category, normalized_name
def src_name_from_url(self, index_url):
name, _, tld_guess = urllib.parse.urlsplit(index_url).netloc.rpartition(".")
src_name = name.replace(".", "")
location = urllib.parse.urlsplit(index_url).netloc
if "." in location:
name, _, tld_guess = location.rpartition(".")
else:
name = location
src_name = name.replace(".", "").replace(":", "")
try:
self.get_source(name=src_name)
except SourceNotFound:
@@ -1221,7 +1250,7 @@ class Project:
with contextlib.suppress(SourceNotFound):
source = self.get_source(name=index)
if source is not None:
if source is not None and source.get("name"):
return source["name"]
source = {"url": index, "verify_ssl": verify_ssl}
source["name"] = self.src_name_from_url(index)