Skip to content
Snippets Groups Projects
main.py 4.28 KiB
Newer Older
  • Learn to ignore specific revisions
  • Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    import os
    import yaml
    import time
    import magic
    
    import logging
    logging.root.handlers = []
    logging.basicConfig(format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s", datefmt="[%Y-%m-%d %H:%M:%S %z]")
    
    logger = logging.getLogger(__name__)
    
    from minio import Minio
    
    from minio.error import ResponseError, NoSuchBucket
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
    def mirror(src, dst, bucket, tmp_directory):
    
        logger.debug("Processing bucket: {0}".format(bucket))
    
        try:
            # this is lazy (the list_objects_v2 method returns an iterator):
            src_objects_iterator = src.list_objects_v2(bucket, prefix=None, recursive=True)
            # this is not:
            src_objects_list = list( src_objects_iterator )
    
        except NoSuchBucket:
            logger.error('The source bucket does not exist!')
            return
    
        try:
            # this is lazy (the list_objects_v2 method returns an iterator):
            dst_objects_iterator = dst.list_objects(bucket, prefix=None, recursive=True)
            # this is not:
            dst_objects_list = list( dst_objects_iterator )
    
        except NoSuchBucket:
            logger.error("The destination bucket does not exist: let's make it!")
            dst.make_bucket(bucket)
    
    
        src_object_names = set([ obj.object_name for obj in src_objects_list ])
        dst_object_names = set([ obj.object_name for obj in dst_objects_list ])
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
    
        policy = src.get_bucket_policy(bucket).decode('utf-8')
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        logger.debug("Policy of the source bucket: {0}".format(policy))
        dst.set_bucket_policy(bucket, policy)
    
    
        object_names_to_cp = list(src_object_names - dst_object_names)
        object_names_to_rm = list(dst_object_names - src_object_names)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        if len( object_names_to_cp ) > 0:
    
            logger.debug("The following objects were only found in the source bucket: {0}".format(str(object_names_to_cp)))
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
            for object_name in object_names_to_cp:
                # print(obj.bucket_name, obj.object_name.encode('utf-8'), obj.last_modified,
                #       obj.etag, obj.size, obj.content_type)
    
                logger.info("Syncing object: {0}".format(object_name))
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
                tmp_file = os.path.join(tmp_directory, 'tmp_file')
    
                # Get a full object.
                try:
                    data = src.get_object(bucket, object_name)
                    with open(tmp_file, 'wb') as file_data:
                        for d in data.stream(32*1024):
                            file_data.write(d)
    
                except ResponseError as err:
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                    logger.error(err)
    
                try:
                    with open(tmp_file, 'rb') as file_data:
                        file_stat = os.stat(tmp_file)
                        mime_type = magic.from_file(tmp_file, mime=True)
                        logger.debug("MIME type: {0}".format(mime_type))
                        dst.put_object(bucket, object_name, file_data,
                                        file_stat.st_size, content_type=mime_type)
    
                except ResponseError as err:
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                    logger.error(err)
        else:
            logger.debug('No object to copy.')
    
        if len( object_names_to_rm ) > 0:
    
            logger.debug("The following objects were only found in the destination bucket: {0}".format(str(object_names_to_rm)))
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
            try:
                for del_err in dst.remove_objects(bucket, object_names_to_rm):
                    logger.error("Deletion Error: {}".format(del_err))
    
            except ResponseError as err:
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                logger.error(err)
        else:
            logger.debug('No object to remove.')
    
    
    
    if __name__ == '__main__':
    
        from yaml import load, dump
        try:
            from yaml import CLoader as Loader, CDumper as Dumper
        except ImportError:
            from yaml import Loader, Dumper
    
    
        with open("config.yaml", "r") as yamlfile:
            cfg = load(yamlfile, Loader=Loader)
    
        logger.setLevel(cfg['loglevel'])
    
        src = Minio(cfg['src']['endpoint'],
                       access_key=cfg['src']['access_key'],
                       secret_key=cfg['src']['secret_key'],
                       secure=cfg['src']['secure'])
    
        dst = Minio(cfg['dst']['endpoint'],
                       access_key=cfg['dst']['access_key'],
                       secret_key=cfg['dst']['secret_key'],
                       secure=cfg['dst']['secure'])
    
        while True:
            try:
                for bucket in cfg['buckets'].split(','):
                    mirror(src, dst, bucket, cfg['tmp_directory'])
            except Exception as e:
                logger.error(e)
    
            time.sleep(cfg['sleep'])