mirror of
https://github.com/kennethreitz/bruce-operator.git
synced 2026-06-05 23:20:18 +00:00
55 lines
1.4 KiB
Python
55 lines
1.4 KiB
Python
import os
|
|
|
|
import boto3
|
|
import botocore
|
|
|
|
from .env import MINIO_ACCESS_KEY, MINIO_SECRET_KEY, MINIO_SERVER, BUILDPACKS_BUCKET
|
|
|
|
os.environ["AWS_ACCESS_KEY_ID"] = MINIO_ACCESS_KEY
|
|
os.environ["AWS_SECRET_ACCESS_KEY"] = MINIO_SECRET_KEY
|
|
|
|
|
|
class Buildpacks:
|
|
def __init__(self, mino=None):
|
|
self.bucket_name = BUILDPACKS_BUCKET
|
|
self.minio = boto3.resource(
|
|
"s3",
|
|
endpoint_url=f"http://{MINIO_SERVER}",
|
|
config=boto3.session.Config(signature_version="s3v4"),
|
|
)
|
|
|
|
self.ensure_buckets()
|
|
self.bucket = self.minio.Bucket(self.bucket_name)
|
|
|
|
def ensure_buckets(self):
|
|
try:
|
|
self.minio.create_bucket(Bucket=BUILDPACKS_BUCKET)
|
|
except botocore.errorfactory.ClientError:
|
|
pass
|
|
|
|
def list(self):
|
|
keys = []
|
|
for key in self.bucket.objects.all():
|
|
print(object)
|
|
|
|
def exists(self, name):
|
|
o = self.minio.Object(self.bucket_name, name)
|
|
try:
|
|
return bool(o.get())
|
|
except botocore.errorfactory.ClientError:
|
|
return None
|
|
|
|
def get(self, name):
|
|
o = self.minio.Object(self.bucket_name, name)
|
|
try:
|
|
return o.get()["Body"].read()
|
|
except botocore.errorfactory.ClientError:
|
|
return None
|
|
|
|
def set(self, name, value):
|
|
o = self.minio.Object(self.bucket_name, name)
|
|
return o.put(Body=value)
|
|
|
|
|
|
buildpacks = Buildpacks()
|