Newer
Older
import os
import yaml
import time
import magic
import logging
logging.root.handlers = []
logging.basicConfig(format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s", datefmt="[%Y-%m-%d %H:%M:%S %z]")
logger = logging.getLogger(__name__)
from minio import Minio
from minio.error import ResponseError, NoSuchBucket
def mirror(src, dst, bucket, tmp_directory):
logger.debug("Processing bucket: {0}".format(bucket))
try:
# this is lazy (the list_objects_v2 method returns an iterator):
src_objects_iterator = src.list_objects_v2(bucket, prefix=None, recursive=True)
# this is not:
src_objects_list = list( src_objects_iterator )
except NoSuchBucket:
logger.error('The source bucket does not exist!')
return
try:
# this is lazy (the list_objects_v2 method returns an iterator):
dst_objects_iterator = dst.list_objects(bucket, prefix=None, recursive=True)
# this is not:
dst_objects_list = list( dst_objects_iterator )
except NoSuchBucket:
logger.error("The destination bucket does not exist: let's make it!")
dst.make_bucket(bucket)
src_object_names = set([ obj.object_name for obj in src_objects_list ])
dst_object_names = set([ obj.object_name for obj in dst_objects_list ])
policy = src.get_bucket_policy(bucket).decode('utf-8')
logger.debug("Policy of the source bucket: {0}".format(policy))
dst.set_bucket_policy(bucket, policy)
object_names_to_cp = list(src_object_names - dst_object_names)
object_names_to_rm = list(dst_object_names - src_object_names)
logger.debug("The following objects were only found in the source bucket: {0}".format(str(object_names_to_cp)))
for object_name in object_names_to_cp:
# print(obj.bucket_name, obj.object_name.encode('utf-8'), obj.last_modified,
# obj.etag, obj.size, obj.content_type)
logger.info("Syncing object: {0}".format(object_name))
tmp_file = os.path.join(tmp_directory, 'tmp_file')
# Get a full object.
try:
data = src.get_object(bucket, object_name)
with open(tmp_file, 'wb') as file_data:
for d in data.stream(32*1024):
file_data.write(d)
logger.error(err)
try:
with open(tmp_file, 'rb') as file_data:
file_stat = os.stat(tmp_file)
mime_type = magic.from_file(tmp_file, mime=True)
logger.debug("MIME type: {0}".format(mime_type))
dst.put_object(bucket, object_name, file_data,
file_stat.st_size, content_type=mime_type)
logger.error(err)
else:
logger.debug('No object to copy.')
if len( object_names_to_rm ) > 0:
logger.debug("The following objects were only found in the destination bucket: {0}".format(str(object_names_to_rm)))
try:
for del_err in dst.remove_objects(bucket, object_names_to_rm):
logger.error("Deletion Error: {}".format(del_err))
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
logger.error(err)
else:
logger.debug('No object to remove.')
if __name__ == '__main__':
from yaml import load, dump
try:
from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
from yaml import Loader, Dumper
with open("config.yaml", "r") as yamlfile:
cfg = load(yamlfile, Loader=Loader)
logger.setLevel(cfg['loglevel'])
src = Minio(cfg['src']['endpoint'],
access_key=cfg['src']['access_key'],
secret_key=cfg['src']['secret_key'],
secure=cfg['src']['secure'])
dst = Minio(cfg['dst']['endpoint'],
access_key=cfg['dst']['access_key'],
secret_key=cfg['dst']['secret_key'],
secure=cfg['dst']['secure'])
while True:
try:
for bucket in cfg['buckets'].split(','):
mirror(src, dst, bucket, cfg['tmp_directory'])
except Exception as e:
logger.error(e)
time.sleep(cfg['sleep'])