minor refactor

This commit is contained in:
2019-09-21 07:25:26 -04:00
parent 4b444f07fe
commit cef6a7825e
4 changed files with 238 additions and 240 deletions
+225 -239
View File
@@ -1,32 +1,246 @@
import json
import os
import stat
import sys
from hashlib import sha256
from random import randint
from shlex import quote as shlex_quote
from tempfile import mkstemp
from uuid import uuid4
import delegator
import click
import networkx
from . import utils
from .bash import Bash
INDENT_STYLES = ("\t", " " * 4)
from .constants import INDENT_STYLES
from .exceptions import FilterNotAvailable, NoBakefileFound, TaskNotInBashfile
class NoBakefileFound(RuntimeError):
pass
class Bakefile:
def __init__(self, *, path):
self.path = path
self.environ = os.environ
self._chunks = []
self.args = []
if not os.path.exists(path):
raise NoBakefileFound()
class TaskNotInBashfile(ValueError):
pass
os.environ["BAKEFILE_PATH"] = self.path
os.environ["BAKE_SKIP_DONE"] = "1"
os.environ["PYTHONUNBUFFERED"] = "1"
self.chunks
self._tasks = None
self._graph = None
class FilterNotAvailable(ValueError):
pass
@property
def graph(self):
if self._graph:
return self._graph
g = networkx.OrderedDiGraph()
for task in self.tasks.values():
g.add_node(task)
for dep in task.depends_on():
g.add_edge(task, dep)
self._graph = g
return self.graph
def __repr__(self):
return f"<Bakefile path={self.path!r}>"
def __getitem__(self, key):
return self.tasks[key]
def _iter_chunks(self):
all_chunks = [tl for tl in self._iter_chunk_task_lines()]
task_lines = [tl if tl[1] else None for tl in self._iter_chunk_task_lines()]
# Unsort / resort.
task_lines = list(set(task_lines))
try:
task_lines.pop(task_lines.index(None))
except ValueError:
pass
task_lines = sorted(task_lines, key=lambda x: x[0])
for i, (index, declaration_line) in enumerate(task_lines):
try:
end_index = task_lines[i + 1][0]
except IndexError:
i = all_chunks.index((index, declaration_line))
try:
end_index = all_chunks[i + 1][0]
except IndexError:
end_index = None
yield self.source_lines[index:end_index]
def _iter_chunk_task_lines(self):
for i, line in enumerate(self.source_lines):
if line:
if self._is_declaration_line(line, collect_all=True):
if self._is_declaration_line(line, collect_all=False):
yield (i, line.rstrip())
else:
yield (i, None)
@property
def home(self):
return os.path.dirname(self.path)
@property
def chunks(self):
if not self._chunks:
self._chunks = [c for c in self._iter_chunks()]
return self._chunks
def find_chunk(self, task_name):
for i, chunk in enumerate(self.chunks):
if chunk[0].split(":")[0].strip() == task_name:
return i
def __iter__(self):
return (v for v in self.tasks.values())
def add_args(self, *args):
self.args.extend(args)
def add_environ(self, key, value):
self.environ[key] = value
def add_environ_json(self, s):
try:
j = json.loads(s)
except json.JSONDecodeError:
assert os.path.exists(s)
# Assume a path was passed, instead.
with open(s, "r") as f:
j = json.load(f)
self.environ.update(j)
@property
def home_path(self):
return os.path.abspath(os.path.dirname(self.path))
@classmethod
def find(
Class, *, filename="Bashfile", root=os.getcwd(), max_depth=4, topdown=False
):
"""Returns the path of a Pipfile in parent directories."""
i = 0
for c, d, f in utils.walk_up(root):
if i > max_depth:
raise NoBakefileFound(f"No {filename} found!")
elif filename in f:
return Class(path=os.path.join(c, filename))
i += 1
@property
def source(self):
with open(self.path, "r") as f:
return f.read()
@property
def source_lines(self):
return self.source.split("\n")
def _is_declaration_line(self, line, collect_all=False):
line = line.replace("\t", " " * 4)
if not len(line[0].strip()):
return False
if not self._is_comment_line(line):
if not collect_all:
if ":" in line:
return bool(len(line[:4].strip()))
else:
return bool(len(line[:4].strip()))
@staticmethod
def _is_safe_to_inject(shebang):
# --- Note: This is kind of a clever hack, as this matches both
# bash and sh (and many other potentiallycompatible shells).
return shebang.strip().endswith("sh")
def _is_task_line(self, line):
if line.startswith(INDENT_STYLES[0]) or line.startswith(INDENT_STYLES[1]):
return True
@staticmethod
def _is_shebang_line(line):
return line.lstrip().startswith("#!")
@staticmethod
def _is_comment_line(line, *, exclude_shebang=True):
if exclude_shebang:
return line.strip().startswith("#") and not line.startswith("#!")
else:
return line.strip().startswith("#")
@staticmethod
def _comment_line(line):
return f"# {line}"
@property
def tasks(self):
if self._tasks:
return self._tasks
tasks = {}
for i, chunk in enumerate(self.chunks):
script = TaskScript._from_chunk_index(bashfile=self, i=i)
tasks[script.name] = script
self._tasks = tasks
return self.tasks
@property
def iter_root_source_lines(self):
"""The source of the 'root level' of the Bashfile."""
task_active = False
for line in self.source_lines:
if line:
if self._is_declaration_line(line):
task_active = True
else:
if not self._is_task_line(line):
task_active = False
if not task_active:
yield line
@property
def root_source(self):
"""The source of the 'root level' of the Bashfile."""
return "\n".join(list(self.iter_root_source_lines))
@property
def funcs_source(self):
"""Functions (_task_name), inserted into the Bash runtime."""
source = []
for task in self.tasks:
task = self[task]
f_name = task.name.replace("/", "_")
f_name = f_name.replace("-", "_")
source.append(
# Replace / namespacing with _ namespacing, for functions.
f"{f_name}()"
+ " { \n"
+ f" bake --silent {task.name} $@;\n"
+ "}\n"
+ f"declare -x {f_name};"
)
return "\n".join(source)
class BaseAction:
@@ -375,231 +589,3 @@ class TaskScript(BaseAction):
@property
def source_lines(self):
return [s for s in self._iter_source()]
class Bakefile:
def __init__(self, *, path):
self.path = path
self.environ = os.environ
self._chunks = []
self.args = []
if not os.path.exists(path):
raise NoBakefileFound()
os.environ["BAKEFILE_PATH"] = self.path
os.environ["BAKE_SKIP_DONE"] = "1"
os.environ["PYTHONUNBUFFERED"] = "1"
self.chunks
self._tasks = None
self._graph = None
@property
def graph(self):
if self._graph:
return self._graph
g = networkx.OrderedDiGraph()
for task in self.tasks.values():
g.add_node(task)
for dep in task.depends_on():
g.add_edge(task, dep)
self._graph = g
return self.graph
def __repr__(self):
return f"<Bakefile path={self.path!r}>"
def __getitem__(self, key):
return self.tasks[key]
def _iter_chunks(self):
all_chunks = [tl for tl in self._iter_chunk_task_lines()]
task_lines = [tl if tl[1] else None for tl in self._iter_chunk_task_lines()]
# Unsort / resort.
task_lines = list(set(task_lines))
try:
task_lines.pop(task_lines.index(None))
except ValueError:
pass
task_lines = sorted(task_lines, key=lambda x: x[0])
for i, (index, declaration_line) in enumerate(task_lines):
try:
end_index = task_lines[i + 1][0]
except IndexError:
i = all_chunks.index((index, declaration_line))
try:
end_index = all_chunks[i + 1][0]
except IndexError:
end_index = None
yield self.source_lines[index:end_index]
def _iter_chunk_task_lines(self):
for i, line in enumerate(self.source_lines):
if line:
if self._is_declaration_line(line, collect_all=True):
if self._is_declaration_line(line, collect_all=False):
yield (i, line.rstrip())
else:
yield (i, None)
@property
def home(self):
return os.path.dirname(self.path)
@property
def chunks(self):
if not self._chunks:
self._chunks = [c for c in self._iter_chunks()]
return self._chunks
def find_chunk(self, task_name):
for i, chunk in enumerate(self.chunks):
if chunk[0].split(":")[0].strip() == task_name:
return i
def __iter__(self):
return (v for v in self.tasks.values())
def add_args(self, *args):
self.args.extend(args)
def add_environ(self, key, value):
self.environ[key] = value
def add_environ_json(self, s):
try:
j = json.loads(s)
except json.JSONDecodeError:
assert os.path.exists(s)
# Assume a path was passed, instead.
with open(s, "r") as f:
j = json.load(f)
self.environ.update(j)
@property
def home_path(self):
return os.path.abspath(os.path.dirname(self.path))
@classmethod
def find(
Class, *, filename="Bashfile", root=os.getcwd(), max_depth=4, topdown=False
):
"""Returns the path of a Pipfile in parent directories."""
i = 0
for c, d, f in utils.walk_up(root):
if i > max_depth:
raise NoBakefileFound(f"No {filename} found!")
elif filename in f:
return Class(path=os.path.join(c, filename))
i += 1
@property
def source(self):
with open(self.path, "r") as f:
return f.read()
@property
def source_lines(self):
return self.source.split("\n")
def _is_declaration_line(self, line, collect_all=False):
line = line.replace("\t", " " * 4)
if not len(line[0].strip()):
return False
if not self._is_comment_line(line):
if not collect_all:
if ":" in line:
return bool(len(line[:4].strip()))
else:
return bool(len(line[:4].strip()))
@staticmethod
def _is_safe_to_inject(shebang):
# --- Note: This is kind of a clever hack, as this matches both
# bash and sh (and many other potentiallycompatible shells).
return shebang.strip().endswith("sh")
def _is_task_line(self, line):
if line.startswith(INDENT_STYLES[0]) or line.startswith(INDENT_STYLES[1]):
return True
@staticmethod
def _is_shebang_line(line):
return line.lstrip().startswith("#!")
@staticmethod
def _is_comment_line(line, *, exclude_shebang=True):
if exclude_shebang:
return line.strip().startswith("#") and not line.startswith("#!")
else:
return line.strip().startswith("#")
@staticmethod
def _comment_line(line):
return f"# {line}"
@property
def tasks(self):
if self._tasks:
return self._tasks
tasks = {}
for i, chunk in enumerate(self.chunks):
script = TaskScript._from_chunk_index(bashfile=self, i=i)
tasks[script.name] = script
self._tasks = tasks
return self.tasks
@property
def iter_root_source_lines(self):
"""The source of the 'root level' of the Bashfile."""
task_active = False
for line in self.source_lines:
if line:
if self._is_declaration_line(line):
task_active = True
else:
if not self._is_task_line(line):
task_active = False
if not task_active:
yield line
@property
def root_source(self):
"""The source of the 'root level' of the Bashfile."""
return "\n".join(list(self.iter_root_source_lines))
@property
def funcs_source(self):
"""Functions (_task_name), inserted into the Bash runtime."""
source = []
for task in self.tasks:
task = self[task]
f_name = task.name.replace("/", "_")
f_name = f_name.replace("-", "_")
source.append(
# Replace / namespacing with _ namespacing, for functions.
f"{f_name}()"
+ " { \n"
+ f" bake --silent {task.name} $@;\n"
+ "}\n"
+ f"declare -x {f_name};"
)
return "\n".join(source)
+2 -1
View File
@@ -3,7 +3,8 @@ import click
import json
import random
from .bakefile import Bakefile, TaskFilter, NoBakefileFound
from .bakefile import Bakefile, TaskFilter
from .exceptions import NoBakefileFound
from .clint import eng_join
import pygments
+1
View File
@@ -0,0 +1 @@
INDENT_STYLES = ("\t", " " * 4)
+10
View File
@@ -0,0 +1,10 @@
class NoBakefileFound(RuntimeError):
pass
class TaskNotInBashfile(ValueError):
pass
class FilterNotAvailable(ValueError):
pass