|
| 1 | +import io |
| 2 | +import os |
| 3 | +from abc import ABC |
| 4 | +from pathlib import Path |
| 5 | + |
| 6 | +import minio |
| 7 | +from minio import credentials, S3Error |
| 8 | + |
| 9 | +from omnistore.objstore.objstore import ObjStore |
| 10 | + |
| 11 | + |
| 12 | +class MinIO(ObjStore): |
| 13 | + def __init__(self, endpoint: str, bucket: str): |
| 14 | + """ |
| 15 | + Construct a new client to communicate with the provider. |
| 16 | + """ |
| 17 | + auth = credentials.EnvMinioProvider() |
| 18 | + self.client = minio.Minio(endpoint, credentials=auth,secure=False) |
| 19 | + self.bucket_name = bucket |
| 20 | + |
| 21 | + # Make sure the bucket exists |
| 22 | + if not self.client.bucket_exists(bucket): |
| 23 | + self.client.make_bucket(bucket) |
| 24 | + |
| 25 | + def create_dir(self, dirname: str): |
| 26 | + if not dirname.endswith("/"): |
| 27 | + dirname += "/" |
| 28 | + empty_stream = io.BytesIO(b"") |
| 29 | + self.client.put_object(self.bucket_name, dirname, empty_stream, 0) |
| 30 | + |
| 31 | + def delete_dir(self, dirname: str): |
| 32 | + if not dirname.endswith("/"): |
| 33 | + dirname += "/" |
| 34 | + objects = self.client.list_objects( |
| 35 | + self.bucket_name, prefix=dirname, recursive=True |
| 36 | + ) |
| 37 | + for obj in objects: |
| 38 | + self.client.remove_object(self.bucket_name, obj.object_name) |
| 39 | + |
| 40 | + def upload(self, src: str, dest: str): |
| 41 | + self.client.fput_object(self.bucket_name, dest, src) |
| 42 | + |
| 43 | + def upload_dir(self, src_dir: str, dest_dir: str): |
| 44 | + for file in Path(src_dir).rglob("*"): |
| 45 | + if file.is_file(): |
| 46 | + dest_path = f"{dest_dir}/{file.relative_to(src_dir)}" |
| 47 | + self.upload(str(file), dest_path) |
| 48 | + elif file.is_dir(): |
| 49 | + self.create_dir(f"{dest_dir}/{file.relative_to(src_dir)}/") |
| 50 | + |
| 51 | + def download(self, src: str, dest: str): |
| 52 | + self.client.fget_object(self.bucket_name, src, dest) |
| 53 | + |
| 54 | + def download_dir(self, src_dir: str, dest_dir: str): |
| 55 | + if not src_dir.endswith("/"): |
| 56 | + src_dir += "/" |
| 57 | + path = Path(dest_dir) |
| 58 | + if not path.exists(): |
| 59 | + path.mkdir(parents=True) |
| 60 | + objects = self.client.list_objects( |
| 61 | + self.bucket_name, prefix=src_dir, recursive=True |
| 62 | + ) |
| 63 | + for obj in objects: |
| 64 | + file_path = Path(dest_dir, Path(obj.object_name).relative_to(src_dir)) |
| 65 | + if not file_path.parent.exists(): |
| 66 | + file_path.parent.mkdir(parents=True, exist_ok=True) |
| 67 | + if obj.object_name.endswith("/"): |
| 68 | + continue |
| 69 | + self.download(obj.object_name, str(file_path)) |
| 70 | + |
| 71 | + def delete(self, filename: str): |
| 72 | + self.client.remove_object(self.bucket_name, filename) |
| 73 | + |
| 74 | + def exists(self, filename: str): |
| 75 | + try: |
| 76 | + self.client.stat_object(self.bucket_name, filename) |
| 77 | + return True |
| 78 | + except S3Error as e: |
| 79 | + if e.code == "NoSuchKey": |
| 80 | + return False |
| 81 | + else: |
| 82 | + raise e |
0 commit comments