Newer
Older
import time
import json
import msgpack
import pika
Alessandro Cerioni
committed
import os, sys
from elasticsearch import Elasticsearch, NotFoundError
Alessandro Cerioni
committed
fileDir = os.path.dirname(os.path.abspath(__file__))
parentDir = os.path.dirname(fileDir)
newPath = os.path.join(parentDir)
sys.path.append(newPath)
from lib.my_logging import logging
from lib.exit_gracefully import exit_gracefully
Alessandro Cerioni
committed
from lib.locker import unlock
from lib.postgis_helper import Remote
class NotEmptyQueueException(Exception):
pass
def create_sampling_task(cfg, channel, uuid):
Alessandro Cerioni
committed
# here-below we generate a task for the sample generator (full -> sample)
msg = dict()
msg['header'] = dict()
msg['header']['cfg'] = cfg
#msg['header']['reindex_task_url'] = reindex_task_url
msg['body'] = uuid
the_body = msgpack.packb(msg, use_bin_type=True)
# connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host']))
# channel = connection.channel()
Alessandro Cerioni
committed
queue_name = cfg['rabbitmq']['queue_name_6']
routing_key = cfg['rabbitmq']['routing_key_6']
channel.exchange_declare(exchange=cfg['rabbitmq']['exchange'], exchange_type='direct')
channel.queue_declare(queue=queue_name, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']})
channel.queue_bind(exchange=cfg['rabbitmq']['exchange'], queue=queue_name, routing_key=routing_key)
channel.basic_publish( exchange=exchange,
routing_key=routing_key,
body=the_body,
properties=pika.BasicProperties(delivery_mode = 2)
)
return
def on_msg_callback(channel, method, properties, body):
decoded_body = msgpack.unpackb(body, raw=False)
cfg = decoded_body['header']['cfg']
uuid = decoded_body['body']
serialized_deferred_count = decoded_body['header']['serialized_deferred_count']
deferred_count = dill.loads(serialized_deferred_count)
count_ref = deferred_count()
# from lib.elasticsearch_template import template
# template['index_patterns'] = [ cfg['reindexer']['destination_index'] ]
# template['settings']['number_of_shards'] = cfg['reindexer']['number_of_shards']
# template['settings']['number_of_replicas'] = cfg['reindexer']['number_of_replicas']
if 'source_url' in cfg['reindexer'].keys():
es_source = Elasticsearch([cfg['reindexer']['source_url']], timeout=60)
else:
es_source = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60)
# es_logger = logging.getLogger('elasticsearch')
# es_logger.setLevel(logging.INFO)
the_query = dict()
the_query['query'] = dict()
the_query['query']['term'] = {'uuid.keyword': '{0}'.format(uuid)}
es_source.indices.refresh(index=cfg['reindexer']['source_index'])
count_es = es_source.count(cfg['reindexer']['source_index'], body=the_query).get('count')
# logging.debug("%i document(s) found in the source index with uuid = %s" % (count1, uuid))
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# if uuid.endswith('.full'):
#
# logging.debug("Waiting for 5 seconds before counting again...")
# time.sleep(5)
#
# es_source.indices.refresh(index=cfg['reindexer']['source_index'])
# count2 = es_source.count(cfg['reindexer']['source_index'], body=the_query).get('count')
# logging.debug("%i document(s) found in the source index with uuid = %s" % (count2, uuid))
#
# if count1 != count2 or count2 == 0:
#
# logging.warning('Documents are still being pushed to the source index. Waiting...')
# time.sleep(5)
# channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
# return
# #raise NotEmptyQueueException('Documents are still being pushed to the source index. Waiting...')
#
# elif uuid.endswith('.meta'):
#
# if count1 != 1:
#
# logging.warning('Documents are still being pushed to the source index. Waiting...')
# time.sleep(5)
# channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
# return
#
# else:
# channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
# logging.error("The uuid ends neither with .full nor with .meta. What shall I do?")
# return
Alessandro Cerioni
committed
Alessandro Cerioni
committed
# if count_es != count_ref:
# logging.warning('Documents are still being pushed to the source index for dataset with uuid = %s' % uuid)
# logging.debug('count_es = %i; count_ref = %i' % (count_es, count_ref))
# time.sleep(5)
# channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
# return
Alessandro Cerioni
committed
# -1. checking whether Elasticsearch is already busy with some other reindexation tasks
es = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60)
rep = es.tasks.list(actions="indices:data/write/reindex")
reindexation_tasks_no = 0
for node_id, node_info in rep['nodes'].items():
reindexation_tasks_no += len(node_info['tasks'].keys())
if reindexation_tasks_no > 0:
logging.info("Elasticsearch is already busy with reindexation tasks. Sleeping for 5 seconds before retrying...")
time.sleep(5)
channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
return
# 0. checking whether Elasticsearch is already busy with some other delete_by_query tasks
es = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60)
rep = es.tasks.list(actions="indices:data/write/delete/byquery")
delete_by_query_tasks_no = 0
for node_id, node_info in rep['nodes'].items():
Alessandro Cerioni
committed
if delete_by_query_tasks_no > 0:
logging.info("Elasticsearch is already busy with delete_by_query tasks. Sleeping for 5 seconds before retrying...")
time.sleep(5)
channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
return
# 1. remove already existing docs from destination index
logging.info("Removing dataset with uuid = %s from the destination index..." % uuid)
Alessandro Cerioni
committed
#es = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60)
index = cfg['reindexer']['destination_index']
Alessandro Cerioni
committed
try:
es.indices.refresh(index=index)
except NotFoundError:
# the destination index may not be already present
pass
the_query = dict()
the_query['query'] = dict()
the_query['query']['term'] = {'uuid.keyword': '{0}'.format(uuid)}
try:
Alessandro Cerioni
committed
res = es.delete_by_query(index, doc_type='_doc', body=the_query, conflicts='proceed', refresh=True, wait_for_completion=False)
Alessandro Cerioni
committed
#logging.debug(res)
Alessandro Cerioni
committed
task_id = res['task']
# wait until ES is done
seconds_to_sleep_for = 1
Alessandro Cerioni
committed
while True:
res = es.tasks.get(task_id=task_id)
Alessandro Cerioni
committed
#logging.debug(res)
Alessandro Cerioni
committed
completed = res['completed']
if not completed:
logging.info('Waiting for delete_by_query to complete: sleeping for %i seconds...' % seconds_to_sleep_for)
time.sleep(seconds_to_sleep_for)
seconds_to_sleep_for += 1
Alessandro Cerioni
committed
else:
break
except NotFoundError:
pass
except Exception as e:
logging.error(e)
channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
return
# # 2. setup template
# try:
# rep = es.indices.delete_template(cfg['reindexer']['template_name'])
# logging.debug(rep)
# except:
# pass
#
# rep = es.indices.put_template(cfg['reindexer']['template_name'], template)
# # rep = es.indices.get_template("template_1")
# logging.debug(rep)
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
body = {
"source": {
"index": cfg['reindexer']['source_index'],
"query": {
"term": {"uuid.keyword": '{0}'.format(uuid)}
},
"type": "_doc",
"size": 1000
},
"dest": {
"index": cfg['reindexer']['destination_index'],
"type": "_doc"
}
}
if 'source_url' in cfg['reindexer'].keys():
body['source']['remote'] = {'host': cfg['reindexer']['source_url']}
rep = es.reindex(body, wait_for_completion=False)
logging.debug(rep)
if 'task' in rep:
channel.basic_ack(delivery_tag = method.delivery_tag)
#print("")
reindex_task_url = "{0}/_tasks/{1}".format(cfg['reindexer']['destination_url'], rep['task'])
logging.info("Created reindex task: {0}".format(reindex_task_url))
Alessandro Cerioni
committed
# 3. create sampling task (full -> sample)
create_sampling_task(cfg, channel, uuid)#, reindex_task_url)
logging.info("Created sampling task.")
else:
channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
#print("")
Alessandro Cerioni
committed
#logging.error(json.dumps(rep, indent=4))
logging.error("Failed")
return
def main(cfg):
connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'], port=cfg['rabbitmq_port']))
#timeout = 5
#connection.add_timeout(timeout, on_timeout(connection))
channel = connection.channel()
exchange = cfg['rabbitmq_exchange']
# the queue this program will consume messages from:
reindex_tasks_to_create_qn = cfg['rabbitmq_queue']
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_message_callback=lambda ch, method, properties, body: on_msg_callback(ch, method, properties, body),
queue=reindex_tasks_to_create_qn)
channel.start_consuming()
connection.close()
return
if __name__ == '__main__':
import yaml
import time
import signal
import argparse
signal.signal(signal.SIGINT, exit_gracefully)
parser = argparse.ArgumentParser(description='Incremental reindexer')
parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, required=True)
parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672)
parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True)
parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True)
parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR'])
args = parser.parse_args()
cfg = dict()
cfg['rabbitmq_host'] = args.host
cfg['rabbitmq_port'] = args.port
cfg['rabbitmq_exchange'] = args.exchange
cfg['rabbitmq_queue'] = args.queue
logging.getLogger().setLevel(args.loglevel)
logging.info('Starting...')
while True:
try:
main(cfg)
except pika.exceptions.ChannelClosed:
logging.info("Waiting for tasks...")
time.sleep(5)
except pika.exceptions.AMQPConnectionError:
logging.info('Waiting for RabbitMQ to be reachable...')
time.sleep(5)
except Exception as e:
logging.error(e)
time.sleep(5)