mirror of
https://github.com/kennethreitz/pipenv.git
synced 2026-06-05 22:50:18 +00:00
Backport NamedTemporaryFile for io.open support
- Allows atomic locking to use `newline` argument Signed-off-by: Dan Ryan <dan@danryan.co>
This commit is contained in:
@@ -0,0 +1,273 @@
|
||||
# -*- coding=utf-8 -*-
|
||||
"""A compatibility module for pipenv's backports and manipulations.
|
||||
|
||||
Exposes a standard API that enables compatibility across python versions,
|
||||
operating systems, etc.
|
||||
"""
|
||||
import functools
|
||||
import io
|
||||
import os
|
||||
import six
|
||||
import warnings
|
||||
from tempfile import _bin_openflags, gettempdir, _mkstemp_inner, mkdtemp
|
||||
from .utils import (logging, rmtree)
|
||||
|
||||
try:
|
||||
from tempfile import _infer_return_type
|
||||
except ImportError:
|
||||
|
||||
def _infer_return_type(*args):
|
||||
_types = set()
|
||||
for arg in args:
|
||||
if isinstance(type(arg), six.string_types):
|
||||
_types.add(str)
|
||||
elif isinstance(type(arg), bytes):
|
||||
_types.add(bytes)
|
||||
else:
|
||||
_types.add(type(arg))
|
||||
return _types.pop()
|
||||
|
||||
|
||||
try:
|
||||
from weakref import finalize
|
||||
except ImportError:
|
||||
try:
|
||||
from .vendor.backports.weakref import finalize
|
||||
except ImportError:
|
||||
|
||||
class finalize(object):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
logging.warn("weakref.finalize unavailable, not cleaning...")
|
||||
|
||||
def detach(self):
|
||||
return False
|
||||
|
||||
|
||||
if six.PY2:
|
||||
|
||||
class ResourceWarning(Warning):
|
||||
pass
|
||||
|
||||
|
||||
class TemporaryDirectory(object):
|
||||
"""Create and return a temporary directory. This has the same
|
||||
behavior as mkdtemp but can be used as a context manager. For
|
||||
example:
|
||||
|
||||
with TemporaryDirectory() as tmpdir:
|
||||
...
|
||||
|
||||
Upon exiting the context, the directory and everything contained
|
||||
in it are removed.
|
||||
"""
|
||||
|
||||
def __init__(self, suffix, prefix, dir=None):
|
||||
if "RAM_DISK" in os.environ:
|
||||
import uuid
|
||||
|
||||
name = uuid.uuid4().hex
|
||||
dir_name = os.path.join(os.environ["RAM_DISK"].strip(), name)
|
||||
os.mkdir(dir_name)
|
||||
self.name = dir_name
|
||||
else:
|
||||
self.name = mkdtemp(suffix, prefix, dir)
|
||||
self._finalizer = finalize(
|
||||
self,
|
||||
self._cleanup,
|
||||
self.name,
|
||||
warn_message="Implicitly cleaning up {!r}".format(self),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _cleanup(cls, name, warn_message):
|
||||
rmtree(name)
|
||||
warnings.warn(warn_message, ResourceWarning)
|
||||
|
||||
def __repr__(self):
|
||||
return "<{} {!r}>".format(self.__class__.__name__, self.name)
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc, value, tb):
|
||||
self.cleanup()
|
||||
|
||||
def cleanup(self):
|
||||
if self._finalizer.detach():
|
||||
rmtree(self.name)
|
||||
|
||||
|
||||
def _sanitize_params(prefix, suffix, dir):
|
||||
"""Common parameter processing for most APIs in this module."""
|
||||
output_type = _infer_return_type(prefix, suffix, dir)
|
||||
if suffix is None:
|
||||
suffix = output_type()
|
||||
if prefix is None:
|
||||
if output_type is str:
|
||||
prefix = "tmp"
|
||||
else:
|
||||
prefix = os.fsencode("tmp")
|
||||
if dir is None:
|
||||
if output_type is str:
|
||||
dir = gettempdir()
|
||||
else:
|
||||
dir = os.fsencode(gettempdir())
|
||||
return prefix, suffix, dir, output_type
|
||||
|
||||
|
||||
class _TemporaryFileCloser:
|
||||
"""A separate object allowing proper closing of a temporary file's
|
||||
underlying file object, without adding a __del__ method to the
|
||||
temporary file."""
|
||||
file = None # Set here since __del__ checks it
|
||||
close_called = False
|
||||
|
||||
def __init__(self, file, name, delete=True):
|
||||
self.file = file
|
||||
self.name = name
|
||||
self.delete = delete
|
||||
|
||||
# NT provides delete-on-close as a primitive, so we don't need
|
||||
# the wrapper to do anything special. We still use it so that
|
||||
# file.name is useful (i.e. not "(fdopen)") with NamedTemporaryFile.
|
||||
if os.name != "nt":
|
||||
|
||||
# Cache the unlinker so we don't get spurious errors at
|
||||
# shutdown when the module-level "os" is None'd out. Note
|
||||
# that this must be referenced as self.unlink, because the
|
||||
# name TemporaryFileWrapper may also get None'd out before
|
||||
# __del__ is called.
|
||||
|
||||
def close(self, unlink=os.unlink):
|
||||
if not self.close_called and self.file is not None:
|
||||
self.close_called = True
|
||||
try:
|
||||
self.file.close()
|
||||
finally:
|
||||
if self.delete:
|
||||
unlink(self.name)
|
||||
|
||||
# Need to ensure the file is deleted on __del__
|
||||
|
||||
def __del__(self):
|
||||
self.close()
|
||||
|
||||
else:
|
||||
|
||||
def close(self):
|
||||
if not self.close_called:
|
||||
self.close_called = True
|
||||
self.file.close()
|
||||
|
||||
|
||||
class _TemporaryFileWrapper:
|
||||
"""Temporary file wrapper
|
||||
This class provides a wrapper around files opened for
|
||||
temporary use. In particular, it seeks to automatically
|
||||
remove the file when it is no longer needed.
|
||||
"""
|
||||
|
||||
def __init__(self, file, name, delete=True):
|
||||
self.file = file
|
||||
self.name = name
|
||||
self.delete = delete
|
||||
self._closer = _TemporaryFileCloser(file, name, delete)
|
||||
|
||||
def __getattr__(self, name):
|
||||
# Attribute lookups are delegated to the underlying file
|
||||
# and cached for non-numeric results
|
||||
# (i.e. methods are cached, closed and friends are not)
|
||||
file = self.__dict__["file"]
|
||||
a = getattr(file, name)
|
||||
if hasattr(a, "__call__"):
|
||||
func = a
|
||||
|
||||
@functools.wraps(func)
|
||||
def func_wrapper(*args, **kwargs):
|
||||
return func(*args, **kwargs)
|
||||
|
||||
# Avoid closing the file as long as the wrapper is alive,
|
||||
# see issue #18879.
|
||||
func_wrapper._closer = self._closer
|
||||
a = func_wrapper
|
||||
if not isinstance(a, int):
|
||||
setattr(self, name, a)
|
||||
return a
|
||||
|
||||
# The underlying __enter__ method returns the wrong object
|
||||
# (self.file) so override it to return the wrapper
|
||||
|
||||
def __enter__(self):
|
||||
self.file.__enter__()
|
||||
return self
|
||||
|
||||
# Need to trap __exit__ as well to ensure the file gets
|
||||
# deleted when used in a with statement
|
||||
|
||||
def __exit__(self, exc, value, tb):
|
||||
result = self.file.__exit__(exc, value, tb)
|
||||
self.close()
|
||||
return result
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
Close the temporary file, possibly deleting it.
|
||||
"""
|
||||
self._closer.close()
|
||||
|
||||
# iter() doesn't use __getattr__ to find the __iter__ method
|
||||
|
||||
def __iter__(self):
|
||||
# Don't return iter(self.file), but yield from it to avoid closing
|
||||
# file as long as it's being used as iterator (see issue #23700). We
|
||||
# can't use 'yield from' here because iter(file) returns the file
|
||||
# object itself, which has a close method, and thus the file would get
|
||||
# closed when the generator is finalized, due to PEP380 semantics.
|
||||
for line in self.file:
|
||||
yield line
|
||||
|
||||
|
||||
def NamedTemporaryFile(
|
||||
mode="w+b",
|
||||
buffering=-1,
|
||||
encoding=None,
|
||||
newline=None,
|
||||
suffix=None,
|
||||
prefix=None,
|
||||
dir=None,
|
||||
delete=True,
|
||||
):
|
||||
"""Create and return a temporary file.
|
||||
Arguments:
|
||||
'prefix', 'suffix', 'dir' -- as for mkstemp.
|
||||
'mode' -- the mode argument to io.open (default "w+b").
|
||||
'buffering' -- the buffer size argument to io.open (default -1).
|
||||
'encoding' -- the encoding argument to io.open (default None)
|
||||
'newline' -- the newline argument to io.open (default None)
|
||||
'delete' -- whether the file is deleted on close (default True).
|
||||
The file is created as mkstemp() would do it.
|
||||
Returns an object with a file-like interface; the name of the file
|
||||
is accessible as its 'name' attribute. The file will be automatically
|
||||
deleted when it is closed unless the 'delete' argument is set to False.
|
||||
"""
|
||||
prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir)
|
||||
flags = _bin_openflags
|
||||
# Setting O_TEMPORARY in the flags causes the OS to delete
|
||||
# the file when it is closed. This is only supported by Windows.
|
||||
if os.name == "nt" and delete:
|
||||
flags |= os.O_TEMPORARY
|
||||
if six.PY2:
|
||||
(fd, name) = _mkstemp_inner(dir, prefix, suffix, flags)
|
||||
else:
|
||||
(fd, name) = _mkstemp_inner(dir, prefix, suffix, flags, output_type)
|
||||
try:
|
||||
file = io.open(
|
||||
fd, mode, buffering=buffering, newline=newline, encoding=encoding
|
||||
)
|
||||
return _TemporaryFileWrapper(file, name, delete)
|
||||
|
||||
except BaseException:
|
||||
os.unlink(name)
|
||||
os.close(fd)
|
||||
raise
|
||||
+3
-1
@@ -45,10 +45,12 @@ from .utils import (
|
||||
get_requirement,
|
||||
is_pinned,
|
||||
is_star,
|
||||
TemporaryDirectory,
|
||||
rmtree,
|
||||
split_argument,
|
||||
)
|
||||
from ._compat import (
|
||||
TemporaryDirectory,
|
||||
)
|
||||
from .import pep508checker, progress
|
||||
from .environments import (
|
||||
PIPENV_COLORBLIND,
|
||||
|
||||
+17
-57
@@ -644,6 +644,7 @@ def is_pinned(val):
|
||||
|
||||
def convert_deps_to_pip(deps, project=None, r=True, include_index=False):
|
||||
""""Converts a Pipfile-formatted dependency to a pip-formatted one."""
|
||||
from ._compat import NamedTemporaryFile
|
||||
dependencies = []
|
||||
for dep in deps.keys():
|
||||
# Default (e.g. '>1.10').
|
||||
@@ -739,7 +740,7 @@ def convert_deps_to_pip(deps, project=None, r=True, include_index=False):
|
||||
return dependencies
|
||||
|
||||
# Write requirements.txt to tmp directory.
|
||||
f = tempfile.NamedTemporaryFile(suffix='-requirements.txt', delete=False)
|
||||
f = NamedTemporaryFile(suffix='-requirements.txt', delete=False)
|
||||
f.write('\n'.join(dependencies).encode('utf-8'))
|
||||
f.close()
|
||||
return f.name
|
||||
@@ -1214,8 +1215,9 @@ def is_readonly_path(fn):
|
||||
|
||||
|
||||
def set_write_bit(fn):
|
||||
if os.path.exists(fn):
|
||||
os.chmod(fn, stat.S_IWRITE | stat.S_IWUSR)
|
||||
if isinstance(fn, six.string_types) and not os.path.exists(fn):
|
||||
return
|
||||
os.chmod(fn, stat.S_IWRITE | stat.S_IWUSR | stat.S_IRUSR)
|
||||
return
|
||||
|
||||
|
||||
@@ -1253,54 +1255,6 @@ def handle_remove_readonly(func, path, exc):
|
||||
raise
|
||||
|
||||
|
||||
class TemporaryDirectory(object):
|
||||
"""Create and return a temporary directory. This has the same
|
||||
behavior as mkdtemp but can be used as a context manager. For
|
||||
example:
|
||||
|
||||
with TemporaryDirectory() as tmpdir:
|
||||
...
|
||||
|
||||
Upon exiting the context, the directory and everything contained
|
||||
in it are removed.
|
||||
"""
|
||||
|
||||
def __init__(self, suffix, prefix, dir=None):
|
||||
if 'RAM_DISK' in os.environ:
|
||||
import uuid
|
||||
|
||||
name = uuid.uuid4().hex
|
||||
dir_name = os.path.join(os.environ['RAM_DISK'].strip(), name)
|
||||
os.mkdir(dir_name)
|
||||
self.name = dir_name
|
||||
else:
|
||||
self.name = tempfile.mkdtemp(suffix, prefix, dir)
|
||||
self._finalizer = finalize(
|
||||
self,
|
||||
self._cleanup,
|
||||
self.name,
|
||||
warn_message="Implicitly cleaning up {!r}".format(self),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def _cleanup(cls, name, warn_message):
|
||||
rmtree(name)
|
||||
warnings.warn(warn_message, ResourceWarning)
|
||||
|
||||
def __repr__(self):
|
||||
return "<{} {!r}>".format(self.__class__.__name__, self.name)
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc, value, tb):
|
||||
self.cleanup()
|
||||
|
||||
def cleanup(self):
|
||||
if self._finalizer.detach():
|
||||
rmtree(self.name)
|
||||
|
||||
|
||||
def split_argument(req, short=None, long_=None):
|
||||
"""Split an argument from a string (finds None if not present).
|
||||
|
||||
@@ -1326,7 +1280,7 @@ def split_argument(req, short=None, long_=None):
|
||||
|
||||
|
||||
@contextmanager
|
||||
def atomic_open_for_write(target, binary=False):
|
||||
def atomic_open_for_write(target, binary=False, newline=None, encoding=None):
|
||||
"""Atomically open `target` for writing.
|
||||
|
||||
This is based on Lektor's `atomic_open()` utility, but simplified a lot
|
||||
@@ -1342,18 +1296,24 @@ def atomic_open_for_write(target, binary=False):
|
||||
* If everything goes well, close the temp file, and replace the actual
|
||||
target with this new file.
|
||||
"""
|
||||
fd, tmp = tempfile.mkstemp(
|
||||
from ._compat import NamedTemporaryFile
|
||||
mode = 'w+b' if binary else 'w'
|
||||
f = NamedTemporaryFile(
|
||||
dir=os.path.dirname(target),
|
||||
prefix='.__atomic-write',
|
||||
mode=mode,
|
||||
encoding=encoding,
|
||||
newline=newline,
|
||||
delete=False,
|
||||
)
|
||||
os.chmod(tmp, 0o644)
|
||||
f = os.fdopen(fd, 'wb' if binary else 'w')
|
||||
# set permissions to 0644
|
||||
os.chmod(f.name, stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
|
||||
try:
|
||||
yield f
|
||||
except BaseException:
|
||||
f.close()
|
||||
try:
|
||||
os.remove(tmp)
|
||||
os.remove(f.name)
|
||||
except OSError:
|
||||
pass
|
||||
raise
|
||||
@@ -1363,4 +1323,4 @@ def atomic_open_for_write(target, binary=False):
|
||||
os.remove(target) # This is needed on Windows.
|
||||
except OSError:
|
||||
pass
|
||||
os.rename(tmp, target) # No os.replace() on Python 2.
|
||||
os.rename(f.name, target) # No os.replace() on Python 2.
|
||||
|
||||
@@ -3,7 +3,8 @@
|
||||
# Taken from pip
|
||||
# see https://github.com/pypa/pip/blob/95bcf8c5f6394298035a7332c441868f3b0169f4/tasks/vendoring/__init__.py
|
||||
from pathlib import Path
|
||||
from pipenv.utils import TemporaryDirectory, mkdir_p
|
||||
from pipenv._compat import TemporaryDirectory
|
||||
from pipenv.utils import mkdir_p
|
||||
# from tempfile import TemporaryDirectory
|
||||
import tarfile
|
||||
import zipfile
|
||||
|
||||
@@ -4,7 +4,7 @@ import warnings
|
||||
|
||||
import pytest
|
||||
|
||||
from pipenv.utils import TemporaryDirectory
|
||||
from pipenv._compat import TemporaryDirectory
|
||||
from pipenv.vendor import delegator
|
||||
from pipenv.vendor import requests
|
||||
from pipenv.vendor import six
|
||||
|
||||
Reference in New Issue
Block a user