Update requirement parser and lockfile generation for vcs dependencies, fix test

Signed-off-by: Dan Ryan <dan.ryan@canonical.com>
This commit is contained in:
Dan Ryan
2020-05-19 13:36:23 -04:00
parent c4f6b7f6f4
commit befb29ceae
3 changed files with 228 additions and 12 deletions
+1 -1
View File
@@ -586,7 +586,7 @@ class Resolver(object):
constraints.add(line)
# ensure the top level entry remains as provided
# note that we shouldn't pin versions for editable vcs deps
if (not req.is_vcs or (req.is_vcs and not req.editable)):
if not req.is_vcs:
if req.specifiers:
locked_deps[name]["version"] = req.specifiers
elif parsed_line.setup_info and parsed_line.setup_info.version:
+225 -9
View File
@@ -696,6 +696,7 @@ class Analyzer(ast.NodeVisitor):
self.assignments = {}
self.binOps = []
self.binOps_map = {}
self.recurse = True
super(Analyzer, self).__init__()
def generic_visit(self, node):
@@ -710,6 +711,15 @@ class Analyzer(ast.NodeVisitor):
self.assignments.update(ast_unparse(node, initial_mapping=True))
super(Analyzer, self).generic_visit(node)
@contextlib.contextmanager
def no_recurse(self):
original_recurse_val = self.recurse
try:
self.recurse = False
yield
finally:
self.recurse = original_recurse_val
def visit_BinOp(self, node):
node = ast_unparse(node, initial_mapping=True)
self.binOps.append(node)
@@ -728,6 +738,202 @@ class Analyzer(ast.NodeVisitor):
iter(k for k in self.assignments if getattr(k, "id", "") == match.id), None
)
def generic_unparse(self, item):
if any(isinstance(item, k) for k in AST_BINOP_MAP.keys()):
return AST_BINOP_MAP[type(item)]
elif any(isinstance(item, k) for k in AST_COMPARATORS.keys()):
return AST_COMPARATORS[type(item)]
return item
def unparse(self, item):
unparser = getattr(
self, "unparse_{0}".format(item.__class__.__name__), self.generic_unparse
)
return unparser(item)
def unparse_Dict(self, item):
# unparsed = dict(zip(unparse(item.keys), unparse(item.values)))
return dict(
(self.unparse(k), self.unparse(v)) for k, v in zip(item.keys, item.values)
)
def unparse_List(self, item):
return [self.unparse(el) for el in item.elts]
def unparse_Tuple(self, item):
return tuple([self.unparse(el) for el in item.elts])
def unparse_Str(self, item):
return item.s
def unparse_Subscript(self, item):
unparsed = self.unparse(item.value)
if isinstance(item.slice, ast.Index):
try:
unparsed = unparsed[self.unparse(item.slice.value)]
except KeyError:
# not everything can be looked up before runtime
unparsed = item
return unparsed
def unparse_Num(self, item):
return item.n
def unparse_BinOp(self, item):
if item in self.binOps_map:
unparsed = self.binOps_map[item]
else:
right_item = self.unparse(item.right)
left_item = self.unparse(item.left)
op = getattr(item, "op", None)
op_func = self.unparse(op) if op is not None else op
try:
unparsed = op_func(left_item, right_item)
except Exception:
unparsed = (left_item, op_func, right_item)
return unparsed
def unparse_Name(self, item):
unparsed = item.id
if not self.recurse:
return unparsed
if item in self.assignments and self.recurse:
items = self.unparse(self.assignments[item])
unparsed = items.get(item.id, item.id)
else:
assignment = self.match_assignment_name(item)
if assignment is not None:
items = self.unparse(self.assignments[assignment])
unparsed = items.get(item.id, item.id)
return unparsed
def unparse_NameConstant(self, item):
return item.value
def unparse_Constant(self, item):
return item.value
def unparse_Ellipsis(self, item):
return item.value
def unparse_Attribute(self, item):
attr_name = getattr(item, "value", None)
attr_attr = getattr(item, "attr", None)
name = None
name = self.unparse(attr_name) if attr_name is not None else attr_attr
if attr_name and not self.recurse:
name = attr_name
elif name and attr_attr:
if isinstance(name, six.string_types):
unparsed = ".".join([item for item in (name, attr_attr) if item])
else:
unparsed = item
elif attr_attr and not name:
unparsed = attr_attr
else:
unparsed = name if not unparsed else unparsed
return unparsed
def unparse_Compare(self, item):
if isinstance(item.left, ast.Attribute) or isinstance(item.left, ast.Str):
import importlib
left = unparse(item.left)
if "." in left:
name, _, val = left.rpartition(".")
left = getattr(importlib.import_module(name), val, left)
comparators = []
for comparator in item.comparators:
right = self.unparse(comparator)
if isinstance(comparator, ast.Attribute) and "." in right:
name, _, val = right.rpartition(".")
right = getattr(importlib.import_module(name), val, right)
comparators.append(right)
unparsed = (left, self.unparse(item.ops), comparators)
else:
unparsed = item
return unparsed
def unparse_IfExp(self, item):
ops, truth_vals = [], []
if isinstance(item.test, ast.Compare):
left, ops, right = self.unparse(item.test)
else:
result = self.unparse(item.test)
if isinstance(result, dict):
k, v = result.popitem()
if not v:
truth_vals = [False]
for i, op in enumerate(ops):
if i == 0:
truth_vals.append(op(left, right[i]))
else:
truth_vals.append(op(right[i - 1], right[i]))
if all(truth_vals):
unparsed = self.unparse(item.body)
else:
unparsed = self.unparse(item.orelse)
return unparsed
def unparse_Call(self, item):
unparsed = {}
if isinstance(item.func, (ast.Name, ast.Attribute)):
func_name = self.unparse(item.func)
else:
try:
func_name = self.unparse(item.func)
except Exception:
func_name = None
if not func_name:
return {}
if isinstance(func_name, dict):
unparsed.update(func_name)
func_name = next(iter(func_name.keys()))
else:
unparsed[func_name] = {}
for key in ("kwargs", "keywords"):
val = getattr(item, key, [])
if val is None:
continue
for keyword in self.unparse(val):
unparsed[func_name].update(self.unparse(keyword))
return unparsed
def unparse_keyword(self, item):
return {self.unparse(item.arg): self.unparse(item.value)}
def unparse_Assign(self, item):
# XXX: DO NOT UNPARSE THIS
# XXX: If we unparse this it becomes impossible to map it back
# XXX: To the original node in the AST so we can find the
# XXX: Original reference
with self.no_recurse():
target = self.unparse(next(iter(item.targets)))
val = self.unparse(item.value)
if isinstance(target, (tuple, set, list)):
unparsed = dict(zip(target, val))
else:
unparsed = {target: val}
return unparsed
def unparse_Mapping(self, item):
unparsed = {}
for k, v in item.items():
try:
unparsed[self.unparse(k)] = self.unparse(v)
except TypeError:
unparsed[k] = self.unparse(v)
return unparsed
def unparse_list(self, item):
return type(item)([unparse(el) for el in item])
def unparse_tuple(self, item):
return self.unparse_list(item)
def unparse_str(self, item):
return item
def parse_function_names(self, should_retry=True, function_map=None):
if function_map is None:
function_map = {}
@@ -896,15 +1102,21 @@ def ast_unparse(item, initial_mapping=False, analyzer=None, recurse=True): # no
func_name = unparse(item.func)
except Exception:
func_name = None
if func_name and not isinstance(func_name, dict):
unparsed[func_name] = {}
if isinstance(func_name, dict):
unparsed.update(func_name)
func_name = next(iter(func_name.keys()))
for keyword in getattr(item, "keywords", []):
unparsed[func_name].update(unparse(keyword))
elif func_name:
unparsed[func_name] = {}
for keyword in getattr(item, "keywords", []):
unparsed[func_name].update(unparse(keyword))
if func_name:
for key in ("kwargs", "keywords"):
val = getattr(item, key, [])
if val is None:
continue
if isinstance(val, ast.Name):
unparsed[func_name] = val
else:
for keyword in unparse(val):
unparsed[func_name].update(unparse(keyword))
elif isinstance(item, ast.keyword):
unparsed = {unparse(item.arg): unparse(item.value)}
elif isinstance(item, ast.Assign):
@@ -979,6 +1191,9 @@ def ast_parse_setup_py(path):
function_names = ast_analyzer.parse_functions()
if "setup" in function_names:
setup = ast_unparse(function_names["setup"], analyzer=ast_analyzer)
keys = list(setup.keys())
if len(keys) == 1 and keys[0] is None:
_, setup = setup.popitem()
return setup
@@ -1405,8 +1620,8 @@ build-backend = "{1}"
# type: () -> Dict[S, Any]
"""Wipe existing distribution info metadata for rebuilding.
Erases metadata from **self.egg_base** and unsets **self.requirements**
and **self.extras**.
Erases metadata from **self.egg_base** and unsets
**self.requirements** and **self.extras**.
"""
for metadata_dir in os.listdir(self.egg_base):
shutil.rmtree(metadata_dir, ignore_errors=True)
@@ -1428,7 +1643,8 @@ build-backend = "{1}"
def get_egg_metadata(self, metadata_dir=None, metadata_type=None):
# type: (Optional[AnyStr], Optional[AnyStr]) -> Dict[Any, Any]
"""Given a metadata directory, return the corresponding metadata dictionary.
"""Given a metadata directory, return the corresponding metadata
dictionary.
:param Optional[str] metadata_dir: Root metadata path, default: `os.getcwd()`
:param Optional[str] metadata_type: Type of metadata to search for, default None
+2 -2
View File
@@ -744,7 +744,7 @@ def test_lock_nested_vcs_direct_url(PipenvInstance):
c = p.pipenv("install")
assert c.return_code == 0
assert "git" in p.lockfile["default"]["pep508-package"]
assert "sibling_package" in p.lockfile["default"]
assert "git" in p.lockfie["default"]["sibling-package"]
assert "sibling-package" in p.lockfile["default"]
assert "git" in p.lockfile["default"]["sibling-package"]
assert "subdirectory" in p.lockfile["default"]["sibling-package"]
assert "version" not in p.lockfile["default"]["sibling-package"]