import os import yaml import time import magic import logging logging.root.handlers = [] logging.basicConfig(format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s", datefmt="[%Y-%m-%d %H:%M:%S %z]") logger = logging.getLogger(__name__) from minio import Minio from minio.error import ResponseError, NoSuchBucket def mirror(src, dst, bucket, tmp_directory): logger.debug("Processing bucket: {0}".format(bucket)) try: # this is lazy (the list_objects_v2 method returns an iterator): src_objects_iterator = src.list_objects_v2(bucket, prefix=None, recursive=True) # this is not: src_objects_list = list( src_objects_iterator ) except NoSuchBucket: logger.error('The source bucket does not exist!') return try: # this is lazy (the list_objects_v2 method returns an iterator): dst_objects_iterator = dst.list_objects(bucket, prefix=None, recursive=True) # this is not: dst_objects_list = list( dst_objects_iterator ) except NoSuchBucket: logger.error("The destination bucket does not exist: let's make it!") dst.make_bucket(bucket) src_object_names = set([ obj.object_name for obj in src_objects_list ]) dst_object_names = set([ obj.object_name for obj in dst_objects_list ]) policy = src.get_bucket_policy(bucket).decode('utf-8') logger.debug("Policy of the source bucket: {0}".format(policy)) dst.set_bucket_policy(bucket, policy) object_names_to_cp = list(src_object_names - dst_object_names) object_names_to_rm = list(dst_object_names - src_object_names) if len( object_names_to_cp ) > 0: logger.debug("The following objects were only found in the source bucket: {0}".format(str(object_names_to_cp))) for object_name in object_names_to_cp: # print(obj.bucket_name, obj.object_name.encode('utf-8'), obj.last_modified, # obj.etag, obj.size, obj.content_type) logger.info("Syncing object: {0}".format(object_name)) tmp_file = os.path.join(tmp_directory, 'tmp_file') # Get a full object. try: data = src.get_object(bucket, object_name) with open(tmp_file, 'wb') as file_data: for d in data.stream(32*1024): file_data.write(d) except ResponseError as err: logger.error(err) try: with open(tmp_file, 'rb') as file_data: file_stat = os.stat(tmp_file) mime_type = magic.from_file(tmp_file, mime=True) logger.debug("MIME type: {0}".format(mime_type)) dst.put_object(bucket, object_name, file_data, file_stat.st_size, content_type=mime_type) except ResponseError as err: logger.error(err) else: logger.debug('No object to copy.') if len( object_names_to_rm ) > 0: logger.debug("The following objects were only found in the destination bucket: {0}".format(str(object_names_to_rm))) try: for del_err in dst.remove_objects(bucket, object_names_to_rm): logger.error("Deletion Error: {}".format(del_err)) except ResponseError as err: logger.error(err) else: logger.debug('No object to remove.') if __name__ == '__main__': from yaml import load, dump try: from yaml import CLoader as Loader, CDumper as Dumper except ImportError: from yaml import Loader, Dumper with open("config.yaml", "r") as yamlfile: cfg = load(yamlfile, Loader=Loader) logger.setLevel(cfg['loglevel']) src = Minio(cfg['src']['endpoint'], access_key=cfg['src']['access_key'], secret_key=cfg['src']['secret_key'], secure=cfg['src']['secure']) dst = Minio(cfg['dst']['endpoint'], access_key=cfg['dst']['access_key'], secret_key=cfg['dst']['secret_key'], secure=cfg['dst']['secure']) while True: try: for bucket in cfg['buckets'].split(','): mirror(src, dst, bucket, cfg['tmp_directory']) except Exception as e: logger.error(e) time.sleep(cfg['sleep'])