mirror of
https://github.com/kennethreitz/pipenv.git
synced 2026-06-05 22:50:18 +00:00
@@ -14,6 +14,7 @@ import background
|
||||
import click
|
||||
import click_completion
|
||||
import crayons
|
||||
import dotenv
|
||||
import delegator
|
||||
import pexpect
|
||||
import requests
|
||||
@@ -22,6 +23,7 @@ import pipfile
|
||||
import pipdeptree
|
||||
import requirements
|
||||
import semver
|
||||
|
||||
from blindspin import spinner
|
||||
from requests.packages.urllib3.exceptions import InsecureRequestWarning
|
||||
from pip.req.req_file import parse_requirements
|
||||
@@ -109,6 +111,10 @@ requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
|
||||
project = Project()
|
||||
|
||||
|
||||
def load_dot_env():
|
||||
dotenv.load_dotenv(dotenv.find_dotenv(os.sep.join([project.project_directory, '.env'])), override=True)
|
||||
|
||||
|
||||
def add_to_path(p):
|
||||
"""Adds a given path to the PATH."""
|
||||
if p not in os.environ['PATH']:
|
||||
@@ -1749,6 +1755,9 @@ def shell(three=None, python=False, compat=False, shell_args=None, anyway=False)
|
||||
|
||||
sys.exit(1)
|
||||
|
||||
# Load .env file.
|
||||
load_dot_env()
|
||||
|
||||
do_shell(three=three, python=python, compat=compat, shell_args=shell_args)
|
||||
|
||||
|
||||
@@ -1780,6 +1789,8 @@ def run(command, args, three=None, python=False):
|
||||
# Ensure that virtualenv is available.
|
||||
ensure_project(three=three, python=python, validate=False)
|
||||
|
||||
load_dot_env()
|
||||
|
||||
# Seperate out things that were passed in as a string.
|
||||
_c = list(command.split())
|
||||
command = _c.pop(0)
|
||||
|
||||
@@ -71,7 +71,6 @@ packages = [
|
||||
]
|
||||
|
||||
|
||||
|
||||
def suggest_package(package):
|
||||
"""Suggests a package name, given a package name."""
|
||||
THRESHOLD = 80
|
||||
|
||||
Vendored
+8
@@ -0,0 +1,8 @@
|
||||
from .cli import get_cli_string
|
||||
from .main import load_dotenv, get_key, set_key, unset_key, find_dotenv
|
||||
try:
|
||||
from .ipython import load_ipython_extension
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
__all__ = ['get_cli_string', 'load_dotenv', 'get_key', 'set_key', 'unset_key', 'find_dotenv', 'load_ipython_extension']
|
||||
Vendored
+98
@@ -0,0 +1,98 @@
|
||||
import os
|
||||
|
||||
import click
|
||||
|
||||
from .main import get_key, dotenv_values, set_key, unset_key
|
||||
|
||||
|
||||
@click.group()
|
||||
@click.option('-f', '--file', default=os.path.join(os.getcwd(), '.env'),
|
||||
type=click.Path(exists=True),
|
||||
help="Location of the .env file, defaults to .env file in current working directory.")
|
||||
@click.option('-q', '--quote', default='always',
|
||||
type=click.Choice(['always', 'never', 'auto']),
|
||||
help="Whether to quote or not the variable values. Default mode is always. This does not affect parsing.")
|
||||
@click.pass_context
|
||||
def cli(ctx, file, quote):
|
||||
'''This script is used to set, get or unset values from a .env file.'''
|
||||
ctx.obj = {}
|
||||
ctx.obj['FILE'] = file
|
||||
ctx.obj['QUOTE'] = quote
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.pass_context
|
||||
def list(ctx):
|
||||
'''Display all the stored key/value.'''
|
||||
file = ctx.obj['FILE']
|
||||
dotenv_as_dict = dotenv_values(file)
|
||||
for k, v in dotenv_as_dict.items():
|
||||
click.echo('%s="%s"' % (k, v))
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.pass_context
|
||||
@click.argument('key', required=True)
|
||||
@click.argument('value', required=True)
|
||||
def set(ctx, key, value):
|
||||
'''Store the given key/value.'''
|
||||
file = ctx.obj['FILE']
|
||||
quote = ctx.obj['QUOTE']
|
||||
success, key, value = set_key(file, key, value, quote)
|
||||
if success:
|
||||
click.echo('%s="%s"' % (key, value))
|
||||
else:
|
||||
exit(1)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.pass_context
|
||||
@click.argument('key', required=True)
|
||||
def get(ctx, key):
|
||||
'''Retrieve the value for the given key.'''
|
||||
file = ctx.obj['FILE']
|
||||
stored_value = get_key(file, key)
|
||||
if stored_value:
|
||||
click.echo('%s="%s"' % (key, stored_value))
|
||||
else:
|
||||
exit(1)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.pass_context
|
||||
@click.argument('key', required=True)
|
||||
def unset(ctx, key):
|
||||
'''Removes the given key.'''
|
||||
file = ctx.obj['FILE']
|
||||
quote = ctx.obj['QUOTE']
|
||||
success, key = unset_key(file, key, quote)
|
||||
if success:
|
||||
click.echo("Successfully removed %s" % key)
|
||||
else:
|
||||
exit(1)
|
||||
|
||||
|
||||
def get_cli_string(path=None, action=None, key=None, value=None):
|
||||
"""Returns a string suitable for running as a shell script.
|
||||
|
||||
Useful for converting a arguments passed to a fabric task
|
||||
to be passed to a `local` or `run` command.
|
||||
"""
|
||||
command = ['dotenv']
|
||||
if path:
|
||||
command.append('-f %s' % path)
|
||||
if action:
|
||||
command.append(action)
|
||||
if key:
|
||||
command.append(key)
|
||||
if value:
|
||||
if ' ' in value:
|
||||
command.append('"%s"' % value)
|
||||
else:
|
||||
command.append(value)
|
||||
|
||||
return ' '.join(command).strip()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli()
|
||||
Vendored
+40
@@ -0,0 +1,40 @@
|
||||
from __future__ import print_function
|
||||
from .main import load_dotenv, find_dotenv
|
||||
|
||||
from IPython.core.magic import Magics, magics_class, line_magic
|
||||
from IPython.core.magic_arguments import (argument, magic_arguments,
|
||||
parse_argstring)
|
||||
|
||||
|
||||
@magics_class
|
||||
class IPythonDotEnv(Magics):
|
||||
|
||||
@magic_arguments()
|
||||
@argument(
|
||||
'-o', '--override', action='store_true',
|
||||
help="Indicate to override existing variables"
|
||||
)
|
||||
@argument(
|
||||
'-v', '--verbose', action='store_true',
|
||||
help="Indicate function calls to be verbose"
|
||||
)
|
||||
@argument('dotenv_path', nargs='?', type=str, default='.env',
|
||||
help='Search in increasingly higher folders for the `dotenv_path`')
|
||||
@line_magic
|
||||
def dotenv(self, line):
|
||||
args = parse_argstring(self.dotenv, line)
|
||||
# Locate the .env file
|
||||
dotenv_path = args.dotenv_path
|
||||
try:
|
||||
dotenv_path = find_dotenv(dotenv_path, True, True)
|
||||
except IOError:
|
||||
print("cannot find .env file")
|
||||
return
|
||||
|
||||
# Load the .env file
|
||||
load_dotenv(dotenv_path, verbose=args.verbose, override=args.override)
|
||||
|
||||
|
||||
def load_ipython_extension(ipython):
|
||||
"""Register the %dotenv magic."""
|
||||
ipython.register_magics(IPythonDotEnv)
|
||||
Vendored
+192
@@ -0,0 +1,192 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import absolute_import
|
||||
|
||||
import codecs
|
||||
import os
|
||||
import sys
|
||||
import warnings
|
||||
import re
|
||||
from collections import OrderedDict
|
||||
|
||||
__escape_decoder = codecs.getdecoder('unicode_escape')
|
||||
__posix_variable = re.compile('\$\{[^\}]*\}')
|
||||
|
||||
|
||||
def decode_escaped(escaped):
|
||||
return __escape_decoder(escaped)[0]
|
||||
|
||||
|
||||
def load_dotenv(dotenv_path, verbose=False, override=False):
|
||||
"""
|
||||
Read a .env file and load into os.environ.
|
||||
"""
|
||||
if not os.path.exists(dotenv_path):
|
||||
if verbose:
|
||||
warnings.warn("Not loading %s - it doesn't exist." % dotenv_path)
|
||||
return None
|
||||
for k, v in dotenv_values(dotenv_path).items():
|
||||
if override:
|
||||
os.environ[k] = v
|
||||
else:
|
||||
os.environ.setdefault(k, v)
|
||||
return True
|
||||
|
||||
|
||||
def get_key(dotenv_path, key_to_get):
|
||||
"""
|
||||
Gets the value of a given key from the given .env
|
||||
|
||||
If the .env path given doesn't exist, fails
|
||||
"""
|
||||
key_to_get = str(key_to_get)
|
||||
if not os.path.exists(dotenv_path):
|
||||
warnings.warn("can't read %s - it doesn't exist." % dotenv_path)
|
||||
return None
|
||||
dotenv_as_dict = dotenv_values(dotenv_path)
|
||||
if key_to_get in dotenv_as_dict:
|
||||
return dotenv_as_dict[key_to_get]
|
||||
else:
|
||||
warnings.warn("key %s not found in %s." % (key_to_get, dotenv_path))
|
||||
return None
|
||||
|
||||
|
||||
def set_key(dotenv_path, key_to_set, value_to_set, quote_mode="always"):
|
||||
"""
|
||||
Adds or Updates a key/value to the given .env
|
||||
|
||||
If the .env path given doesn't exist, fails instead of risking creating
|
||||
an orphan .env somewhere in the filesystem
|
||||
"""
|
||||
key_to_set = str(key_to_set)
|
||||
value_to_set = str(value_to_set).strip("'").strip('"')
|
||||
if not os.path.exists(dotenv_path):
|
||||
warnings.warn("can't write to %s - it doesn't exist." % dotenv_path)
|
||||
return None, key_to_set, value_to_set
|
||||
dotenv_as_dict = OrderedDict(parse_dotenv(dotenv_path))
|
||||
dotenv_as_dict[key_to_set] = value_to_set
|
||||
success = flatten_and_write(dotenv_path, dotenv_as_dict, quote_mode)
|
||||
return success, key_to_set, value_to_set
|
||||
|
||||
|
||||
def unset_key(dotenv_path, key_to_unset, quote_mode="always"):
|
||||
"""
|
||||
Removes a given key from the given .env
|
||||
|
||||
If the .env path given doesn't exist, fails
|
||||
If the given key doesn't exist in the .env, fails
|
||||
"""
|
||||
key_to_unset = str(key_to_unset)
|
||||
if not os.path.exists(dotenv_path):
|
||||
warnings.warn("can't delete from %s - it doesn't exist." % dotenv_path)
|
||||
return None, key_to_unset
|
||||
dotenv_as_dict = dotenv_values(dotenv_path)
|
||||
if key_to_unset in dotenv_as_dict:
|
||||
dotenv_as_dict.pop(key_to_unset, None)
|
||||
else:
|
||||
warnings.warn("key %s not removed from %s - key doesn't exist." % (key_to_unset, dotenv_path))
|
||||
return None, key_to_unset
|
||||
success = flatten_and_write(dotenv_path, dotenv_as_dict, quote_mode)
|
||||
return success, key_to_unset
|
||||
|
||||
|
||||
def dotenv_values(dotenv_path):
|
||||
values = OrderedDict(parse_dotenv(dotenv_path))
|
||||
values = resolve_nested_variables(values)
|
||||
return values
|
||||
|
||||
|
||||
def parse_dotenv(dotenv_path):
|
||||
with open(dotenv_path) as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line or line.startswith('#') or '=' not in line:
|
||||
continue
|
||||
k, v = line.split('=', 1)
|
||||
|
||||
# Remove any leading and trailing spaces in key, value
|
||||
k, v = k.strip(), v.strip().encode('unicode-escape').decode('ascii')
|
||||
|
||||
if len(v) > 0:
|
||||
quoted = v[0] == v[len(v) - 1] in ['"', "'"]
|
||||
|
||||
if quoted:
|
||||
v = decode_escaped(v[1:-1])
|
||||
|
||||
yield k, v
|
||||
|
||||
|
||||
def resolve_nested_variables(values):
|
||||
def _replacement(name):
|
||||
"""
|
||||
get appropriate value for a variable name.
|
||||
first search in environ, if not found,
|
||||
then look into the dotenv variables
|
||||
"""
|
||||
ret = os.getenv(name, values.get(name, ""))
|
||||
return ret
|
||||
|
||||
def _re_sub_callback(match_object):
|
||||
"""
|
||||
From a match object gets the variable name and returns
|
||||
the correct replacement
|
||||
"""
|
||||
return _replacement(match_object.group()[2:-1])
|
||||
|
||||
for k, v in values.items():
|
||||
values[k] = __posix_variable.sub(_re_sub_callback, v)
|
||||
|
||||
return values
|
||||
|
||||
|
||||
def flatten_and_write(dotenv_path, dotenv_as_dict, quote_mode="always"):
|
||||
with open(dotenv_path, "w") as f:
|
||||
for k, v in dotenv_as_dict.items():
|
||||
_mode = quote_mode
|
||||
if _mode == "auto" and " " in v:
|
||||
_mode = "always"
|
||||
str_format = '%s="%s"\n' if _mode == "always" else '%s=%s\n'
|
||||
f.write(str_format % (k, v))
|
||||
return True
|
||||
|
||||
|
||||
def _walk_to_root(path):
|
||||
"""
|
||||
Yield directories starting from the given directory up to the root
|
||||
"""
|
||||
if not os.path.exists(path):
|
||||
raise IOError('Starting path not found')
|
||||
|
||||
if os.path.isfile(path):
|
||||
path = os.path.dirname(path)
|
||||
|
||||
last_dir = None
|
||||
current_dir = os.path.abspath(path)
|
||||
while last_dir != current_dir:
|
||||
yield current_dir
|
||||
parent_dir = os.path.abspath(os.path.join(current_dir, os.path.pardir))
|
||||
last_dir, current_dir = current_dir, parent_dir
|
||||
|
||||
|
||||
def find_dotenv(filename='.env', raise_error_if_not_found=False, usecwd=False):
|
||||
"""
|
||||
Search in increasingly higher folders for the given file
|
||||
|
||||
Returns path to the file if found, or an empty string otherwise
|
||||
"""
|
||||
if usecwd or '__file__' not in globals():
|
||||
# should work without __file__, e.g. in REPL or IPython notebook
|
||||
path = os.getcwd()
|
||||
else:
|
||||
# will work for .py files
|
||||
frame_filename = sys._getframe().f_back.f_code.co_filename
|
||||
path = os.path.dirname(os.path.abspath(frame_filename))
|
||||
|
||||
for dirname in _walk_to_root(path):
|
||||
check_path = os.path.join(dirname, filename)
|
||||
if os.path.exists(check_path):
|
||||
return check_path
|
||||
|
||||
if raise_error_if_not_found:
|
||||
raise IOError('File not found')
|
||||
|
||||
return ''
|
||||
Reference in New Issue
Block a user