diff --git a/bake/bakefile.py b/bake/bakefile.py index 5029da7..45d7108 100644 --- a/bake/bakefile.py +++ b/bake/bakefile.py @@ -1,32 +1,246 @@ import json import os -import stat import sys from hashlib import sha256 from random import randint from shlex import quote as shlex_quote -from tempfile import mkstemp from uuid import uuid4 -import delegator + import click import networkx from . import utils from .bash import Bash - -INDENT_STYLES = ("\t", " " * 4) +from .constants import INDENT_STYLES +from .exceptions import FilterNotAvailable, NoBakefileFound, TaskNotInBashfile -class NoBakefileFound(RuntimeError): - pass +class Bakefile: + def __init__(self, *, path): + self.path = path + self.environ = os.environ + self._chunks = [] + self.args = [] + if not os.path.exists(path): + raise NoBakefileFound() -class TaskNotInBashfile(ValueError): - pass + os.environ["BAKEFILE_PATH"] = self.path + os.environ["BAKE_SKIP_DONE"] = "1" + os.environ["PYTHONUNBUFFERED"] = "1" + self.chunks + self._tasks = None + self._graph = None -class FilterNotAvailable(ValueError): - pass + @property + def graph(self): + if self._graph: + return self._graph + + g = networkx.OrderedDiGraph() + + for task in self.tasks.values(): + g.add_node(task) + for dep in task.depends_on(): + g.add_edge(task, dep) + + self._graph = g + return self.graph + + def __repr__(self): + return f"" + + def __getitem__(self, key): + return self.tasks[key] + + def _iter_chunks(self): + all_chunks = [tl for tl in self._iter_chunk_task_lines()] + task_lines = [tl if tl[1] else None for tl in self._iter_chunk_task_lines()] + + # Unsort / resort. + task_lines = list(set(task_lines)) + try: + task_lines.pop(task_lines.index(None)) + except ValueError: + pass + task_lines = sorted(task_lines, key=lambda x: x[0]) + + for i, (index, declaration_line) in enumerate(task_lines): + try: + end_index = task_lines[i + 1][0] + + except IndexError: + i = all_chunks.index((index, declaration_line)) + try: + end_index = all_chunks[i + 1][0] + except IndexError: + end_index = None + + yield self.source_lines[index:end_index] + + def _iter_chunk_task_lines(self): + for i, line in enumerate(self.source_lines): + if line: + if self._is_declaration_line(line, collect_all=True): + if self._is_declaration_line(line, collect_all=False): + yield (i, line.rstrip()) + else: + yield (i, None) + + @property + def home(self): + return os.path.dirname(self.path) + + @property + def chunks(self): + if not self._chunks: + self._chunks = [c for c in self._iter_chunks()] + return self._chunks + + def find_chunk(self, task_name): + for i, chunk in enumerate(self.chunks): + if chunk[0].split(":")[0].strip() == task_name: + return i + + def __iter__(self): + return (v for v in self.tasks.values()) + + def add_args(self, *args): + self.args.extend(args) + + def add_environ(self, key, value): + self.environ[key] = value + + def add_environ_json(self, s): + try: + j = json.loads(s) + except json.JSONDecodeError: + assert os.path.exists(s) + # Assume a path was passed, instead. + with open(s, "r") as f: + j = json.load(f) + + self.environ.update(j) + + @property + def home_path(self): + return os.path.abspath(os.path.dirname(self.path)) + + @classmethod + def find( + Class, *, filename="Bashfile", root=os.getcwd(), max_depth=4, topdown=False + ): + """Returns the path of a Pipfile in parent directories.""" + + i = 0 + for c, d, f in utils.walk_up(root): + if i > max_depth: + raise NoBakefileFound(f"No {filename} found!") + elif filename in f: + return Class(path=os.path.join(c, filename)) + i += 1 + + @property + def source(self): + with open(self.path, "r") as f: + return f.read() + + @property + def source_lines(self): + return self.source.split("\n") + + def _is_declaration_line(self, line, collect_all=False): + line = line.replace("\t", " " * 4) + + if not len(line[0].strip()): + return False + + if not self._is_comment_line(line): + if not collect_all: + if ":" in line: + return bool(len(line[:4].strip())) + else: + return bool(len(line[:4].strip())) + + @staticmethod + def _is_safe_to_inject(shebang): + # --- Note: This is kind of a clever hack, as this matches both + # bash and sh (and many other potentially–compatible shells). + return shebang.strip().endswith("sh") + + def _is_task_line(self, line): + if line.startswith(INDENT_STYLES[0]) or line.startswith(INDENT_STYLES[1]): + return True + + @staticmethod + def _is_shebang_line(line): + return line.lstrip().startswith("#!") + + @staticmethod + def _is_comment_line(line, *, exclude_shebang=True): + if exclude_shebang: + return line.strip().startswith("#") and not line.startswith("#!") + else: + return line.strip().startswith("#") + + @staticmethod + def _comment_line(line): + return f"# {line}" + + @property + def tasks(self): + if self._tasks: + return self._tasks + + tasks = {} + for i, chunk in enumerate(self.chunks): + script = TaskScript._from_chunk_index(bashfile=self, i=i) + tasks[script.name] = script + + self._tasks = tasks + return self.tasks + + @property + def iter_root_source_lines(self): + """The source of the 'root level' of the Bashfile.""" + task_active = False + for line in self.source_lines: + if line: + if self._is_declaration_line(line): + task_active = True + else: + if not self._is_task_line(line): + task_active = False + + if not task_active: + yield line + + @property + def root_source(self): + """The source of the 'root level' of the Bashfile.""" + return "\n".join(list(self.iter_root_source_lines)) + + @property + def funcs_source(self): + """Functions (_task_name), inserted into the Bash runtime.""" + source = [] + + for task in self.tasks: + task = self[task] + f_name = task.name.replace("/", "_") + f_name = f_name.replace("-", "_") + + source.append( + # Replace / namespacing with _ namespacing, for functions. + f"{f_name}()" + + " { \n" + + f" bake --silent {task.name} $@;\n" + + "}\n" + + f"declare -x {f_name};" + ) + + return "\n".join(source) class BaseAction: @@ -375,231 +589,3 @@ class TaskScript(BaseAction): @property def source_lines(self): return [s for s in self._iter_source()] - - -class Bakefile: - def __init__(self, *, path): - self.path = path - self.environ = os.environ - self._chunks = [] - self.args = [] - - if not os.path.exists(path): - raise NoBakefileFound() - - os.environ["BAKEFILE_PATH"] = self.path - os.environ["BAKE_SKIP_DONE"] = "1" - os.environ["PYTHONUNBUFFERED"] = "1" - - self.chunks - self._tasks = None - self._graph = None - - @property - def graph(self): - if self._graph: - return self._graph - - g = networkx.OrderedDiGraph() - - for task in self.tasks.values(): - g.add_node(task) - for dep in task.depends_on(): - g.add_edge(task, dep) - - self._graph = g - return self.graph - - def __repr__(self): - return f"" - - def __getitem__(self, key): - return self.tasks[key] - - def _iter_chunks(self): - all_chunks = [tl for tl in self._iter_chunk_task_lines()] - task_lines = [tl if tl[1] else None for tl in self._iter_chunk_task_lines()] - - # Unsort / resort. - task_lines = list(set(task_lines)) - try: - task_lines.pop(task_lines.index(None)) - except ValueError: - pass - task_lines = sorted(task_lines, key=lambda x: x[0]) - - for i, (index, declaration_line) in enumerate(task_lines): - try: - end_index = task_lines[i + 1][0] - - except IndexError: - i = all_chunks.index((index, declaration_line)) - try: - end_index = all_chunks[i + 1][0] - except IndexError: - end_index = None - - yield self.source_lines[index:end_index] - - def _iter_chunk_task_lines(self): - for i, line in enumerate(self.source_lines): - if line: - if self._is_declaration_line(line, collect_all=True): - if self._is_declaration_line(line, collect_all=False): - yield (i, line.rstrip()) - else: - yield (i, None) - - @property - def home(self): - return os.path.dirname(self.path) - - @property - def chunks(self): - if not self._chunks: - self._chunks = [c for c in self._iter_chunks()] - return self._chunks - - def find_chunk(self, task_name): - for i, chunk in enumerate(self.chunks): - if chunk[0].split(":")[0].strip() == task_name: - return i - - def __iter__(self): - return (v for v in self.tasks.values()) - - def add_args(self, *args): - self.args.extend(args) - - def add_environ(self, key, value): - self.environ[key] = value - - def add_environ_json(self, s): - try: - j = json.loads(s) - except json.JSONDecodeError: - assert os.path.exists(s) - # Assume a path was passed, instead. - with open(s, "r") as f: - j = json.load(f) - - self.environ.update(j) - - @property - def home_path(self): - return os.path.abspath(os.path.dirname(self.path)) - - @classmethod - def find( - Class, *, filename="Bashfile", root=os.getcwd(), max_depth=4, topdown=False - ): - """Returns the path of a Pipfile in parent directories.""" - - i = 0 - for c, d, f in utils.walk_up(root): - if i > max_depth: - raise NoBakefileFound(f"No {filename} found!") - elif filename in f: - return Class(path=os.path.join(c, filename)) - i += 1 - - @property - def source(self): - with open(self.path, "r") as f: - return f.read() - - @property - def source_lines(self): - return self.source.split("\n") - - def _is_declaration_line(self, line, collect_all=False): - line = line.replace("\t", " " * 4) - - if not len(line[0].strip()): - return False - - if not self._is_comment_line(line): - if not collect_all: - if ":" in line: - return bool(len(line[:4].strip())) - else: - return bool(len(line[:4].strip())) - - @staticmethod - def _is_safe_to_inject(shebang): - # --- Note: This is kind of a clever hack, as this matches both - # bash and sh (and many other potentially–compatible shells). - return shebang.strip().endswith("sh") - - def _is_task_line(self, line): - if line.startswith(INDENT_STYLES[0]) or line.startswith(INDENT_STYLES[1]): - return True - - @staticmethod - def _is_shebang_line(line): - return line.lstrip().startswith("#!") - - @staticmethod - def _is_comment_line(line, *, exclude_shebang=True): - if exclude_shebang: - return line.strip().startswith("#") and not line.startswith("#!") - else: - return line.strip().startswith("#") - - @staticmethod - def _comment_line(line): - return f"# {line}" - - @property - def tasks(self): - if self._tasks: - return self._tasks - - tasks = {} - for i, chunk in enumerate(self.chunks): - script = TaskScript._from_chunk_index(bashfile=self, i=i) - tasks[script.name] = script - - self._tasks = tasks - return self.tasks - - @property - def iter_root_source_lines(self): - """The source of the 'root level' of the Bashfile.""" - task_active = False - for line in self.source_lines: - if line: - if self._is_declaration_line(line): - task_active = True - else: - if not self._is_task_line(line): - task_active = False - - if not task_active: - yield line - - @property - def root_source(self): - """The source of the 'root level' of the Bashfile.""" - return "\n".join(list(self.iter_root_source_lines)) - - @property - def funcs_source(self): - """Functions (_task_name), inserted into the Bash runtime.""" - source = [] - - for task in self.tasks: - task = self[task] - f_name = task.name.replace("/", "_") - f_name = f_name.replace("-", "_") - - source.append( - # Replace / namespacing with _ namespacing, for functions. - f"{f_name}()" - + " { \n" - + f" bake --silent {task.name} $@;\n" - + "}\n" - + f"declare -x {f_name};" - ) - - return "\n".join(source) diff --git a/bake/cli.py b/bake/cli.py index 3c92a6a..8f348c9 100644 --- a/bake/cli.py +++ b/bake/cli.py @@ -3,7 +3,8 @@ import click import json import random -from .bakefile import Bakefile, TaskFilter, NoBakefileFound +from .bakefile import Bakefile, TaskFilter +from .exceptions import NoBakefileFound from .clint import eng_join import pygments diff --git a/bake/constants.py b/bake/constants.py new file mode 100644 index 0000000..0750c0b --- /dev/null +++ b/bake/constants.py @@ -0,0 +1 @@ +INDENT_STYLES = ("\t", " " * 4) diff --git a/bake/exceptions.py b/bake/exceptions.py new file mode 100644 index 0000000..a3d36b7 --- /dev/null +++ b/bake/exceptions.py @@ -0,0 +1,10 @@ +class NoBakefileFound(RuntimeError): + pass + + +class TaskNotInBashfile(ValueError): + pass + + +class FilterNotAvailable(ValueError): + pass