import os import time import json from uuid import uuid4 from functools import lru_cache import logme import kubernetes import delegator from kubeconfig import KubeConfig from kubernetes.client.configuration import Configuration from kubernetes.client.api_client import ApiClient from .env import ( WATCH_NAMESPACE, API_GROUP, API_VERSION, OPERATOR_IMAGE, KUBECONFIG_PATH, IN_KUBERNETES, CERT_LOCATION, TOKEN_LOCATION, ) from .kubectl import kubectl from .buildpacks import fetch_buildpack, extract_buildpacks from .apps import App from .builds import Build # https://github.com/kubernetes-client/python/blob/master/examples/create_thirdparty_resource.md @logme.log class Operator: def __init__(self, api_client=None, fetch_buildpacks=True): # Ensure that we can load the kubeconfig. self.ensure_kubeconfig() # Load Kube configuration into module (ugh). kubernetes.config.load_kube_config() # Setup clients. self.client = kubernetes.client.CoreV1Api() self.custom_client = kubernetes.client.CustomObjectsApi(self.client.api_client) # Fetch all the buildpacks. if fetch_buildpacks: self.fetch_buildpacks() def installed_buildpacks(self, watch=False): group = API_GROUP # str | The custom resource's group name version = API_VERSION # str | The custom resource's version namespace = WATCH_NAMESPACE # str | The custom resource's namespace plural = ( "buildpacks" ) # str | The custom resource's plural name. For TPRs this would be lowercase plural kind. pretty = ( "true" ) # str | If 'true', then the output is pretty printed. (optional) watch = ( watch ) # bool | Watch for changes to the described resources and return them as a stream of add, update, and remove notifications. (optional) try: api_response = self.custom_client.list_namespaced_custom_object( group, version, namespace, plural, pretty=pretty, watch=watch ) items = api_response["items"] except kubernetes.client.rest.ApiException: return None # Sort the buildpacks by their specified index. return sorted(items, key=lambda k: k["spec"]["index"]) @property def installed_apps(self): group = "bruce.kennethreitz.org" # str | The custom resource's group name version = "v1alpha1" # str | The custom resource's version namespace = WATCH_NAMESPACE # str | The custom resource's namespace plural = ( "apps" ) # str | The custom resource's plural name. For TPRs this would be lowercase plural kind. pretty = ( "true" ) # str | If 'true', then the output is pretty printed. (optional) watch = ( False ) # bool | Watch for changes to the described resources and return them as a stream of add, update, and remove notifications. (optional) try: api_response = self.custom_client.list_namespaced_custom_object( group, version, namespace, plural, pretty=pretty, watch=watch ) return api_response["items"] except kubernetes.client.rest.ApiException: return None def kube_spawn_self(self, cmd, label, env=None): if env is None: env = {} # TODO: ENV _hash = uuid4().hex return kubectl( f"run bruce-operator-{label}-{_hash} --image={OPERATOR_IMAGE} -n {WATCH_NAMESPACE} --restart=Never --quiet=True --record=True --image-pull-policy=Always -- bruce-operator {cmd}" ) def spawn_self(self, cmd, label, env=None): if env is None: env = {} # TODO: ENV return delegator.run(f"bruce-operator {cmd}", block=False) def ensure_kubeconfig(self): """Ensures that ~/.kube/config exists, when running in Kubernetes.""" # If we're running in a kubernets cluster... if IN_KUBERNETES and not os.path.isfile( os.path.expanduser("~/.kube/config.lock") ): host = os.environ["KUBERNETES_SERVICE_HOST"] port = os.environ["KUBERNETES_SERVICE_PORT"] # Create a KubeConfig file. kc = KubeConfig() # Read in the secret token. with open(TOKEN_LOCATION, "r") as f: token = f.read() # Set the credentials. kc.set_credentials(name="child", token=token) # Set the cluster information. kc.set_cluster( name="parent", server=f"https://{host}:{port}", certificate_authority=CERT_LOCATION, ) # Set the context. kc.set_context(name="context", cluster="parent", user="child") # Use the context. kc.use_context("context") def fetch_buildpacks(self): for i, buildpack_info in enumerate(self.installed_buildpacks()): fetch_buildpack(i=i, buildpack_info=buildpack_info) def build_app(self, app_name): apps = [App.from_info(app) for app in self.installed_apps] app = None for _app in apps: if _app.name == app_name: app = _app if not app: self.logger.info(f"App {app_name!r} not found.") return self.logger.info(f"Building {app_name}") buildpacks_dir = extract_buildpacks() build = Build( repo_url=app.repo, app_name=app.name, buildpacks_dir=buildpacks_dir ) build.build() def watch(self, fork=True, buildpacks=False, apps=False): if buildpacks and apps: raise RuntimeError("Can only watch one at a time: buildpacks and apps.") if fork: subprocesses = [] cmd = f"bruce-operator http" self.logger.info(f"Running $ {cmd} in the background.") c = delegator.run(cmd, block=False) subprocesses.append(c) for t in ("apps", "buildpacks"): cmd = f"bruce-operator watch --{t}" self.logger.info(f"Running $ {cmd} in the background.") c = delegator.run(cmd, block=False) subprocesses.append(c) self.logger.info(f"Blocking on subprocesses completion.") for subprocess in subprocesses: subprocess.block() if buildpacks: pass