diff --git a/workers/reindexer-oo.py b/workers/reindexer-oo.py new file mode 100644 index 0000000000000000000000000000000000000000..3e24a39f3e4df4482a53f9abacd5a41c504ba1d0 --- /dev/null +++ b/workers/reindexer-oo.py @@ -0,0 +1,246 @@ +import time +import json +import msgpack +import pika +import os, sys +from elasticsearch import Elasticsearch, NotFoundError + +fileDir = os.path.dirname(os.path.abspath(__file__)) +parentDir = os.path.dirname(fileDir) +newPath = os.path.join(parentDir) +sys.path.append(newPath) +from lib.my_logging import logging +from lib.exit_gracefully import exit_gracefully +from lib.locker import unlock + +from lib.rabbit_session import RabbitSession +from lib.log_message import LogMessage + +class NotEmptyQueueException(Exception): + pass + + +class Reindexer : + + def __init__(self, cfg): + self.cfg = cfg + self.rabbit = None + + def create_sampling_task(self, cfg, channel, uuid): + + # here-below we generate a task for the sample generator (full -> meta) + msg = dict() + msg['header'] = dict() + msg['header']['cfg'] = cfg + #msg['header']['reindex_task_url'] = reindex_task_url + msg['body'] = uuid + + the_task_body = msgpack.packb(msg, use_bin_type=True) + + exchange = cfg['rabbitmq']['exchange'] + + queue_name = cfg['rabbitmq']['queue_name_6'] + routing_key = cfg['rabbitmq']['routing_key_6'] + + # ------------------------send task----------------------------------- + self.rabbit.publish_task(the_body=the_task_body, + exchange=exchange, + routing_key=routing_key, + queue_name=queue_name) + # --------------------------------------------------------------------- + + return + + def on_msg_callback(self, channel, method, properties, body): + + + decoded_body = msgpack.unpackb(body, raw=False) + + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], + # session_id=cfg['session']['id'], + uuid=decoded_body['header']['cfg']['session']['current_uuid'], + step='reindexer', + status='Starting...', + uuid_prefix='meta', + info='no info' + ) + self.rabbit.publish_log(log_message=log_message.__dict__) + # ------------------------------------------------------------ + + cfg = decoded_body['header']['cfg'] + uuid = decoded_body['body'] + count_ref = decoded_body['header']['count'] + + + if 'source_url' in cfg['reindexer'].keys(): + es_source = Elasticsearch([cfg['reindexer']['source_url']], timeout=60) + else: + es_source = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60) + + + + the_query = dict() + the_query['query'] = dict() + the_query['query']['term'] = {'uuid.keyword': '{0}'.format(uuid)} + + es_source.indices.refresh(index=cfg['reindexer']['source_index']) + count_es = es_source.count(cfg['reindexer']['source_index'], body=the_query).get('count') + + + + if count_es != count_ref: + logging.warning('Documents are still being pushed to the source index for dataset with uuid = %s' % uuid) + logging.debug('count_es = %i; count_ref = %i' % (count_es, count_ref)) + time.sleep(5) + channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + return + + + # 1. remove already existing docs from destination index + logging.info("Removing dataset with uuid = %s from the destination index..." % uuid) + + es = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60) + index = cfg['reindexer']['destination_index'] + + try: + es.indices.refresh(index=index) + except NotFoundError: + # the destination index may not be already present + pass + + the_query = dict() + the_query['query'] = dict() + the_query['query']['term'] = {'uuid.keyword': '{0}'.format(uuid)} + + try: + res = es.delete_by_query(index, doc_type='_doc', body=the_query) + logging.debug(res) + res = es.indices.refresh(index=index) + logging.debug(res) + except NotFoundError: + pass + except Exception as e: + logging.error(e) + channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + return + + + # 3. trigger reindexation + + body = { + "source": { + "index": cfg['reindexer']['source_index'], + "query": { + "term": {"uuid.keyword": '{0}'.format(uuid)} + }, + "type": "_doc", + "size": 1000 + }, + "dest": { + "index": cfg['reindexer']['destination_index'], + "type": "_doc" + } + } + + if 'source_url' in cfg['reindexer'].keys(): + body['source']['remote'] = {'host': cfg['reindexer']['source_url']} + + rep = es.reindex(body, wait_for_completion=False) + + logging.debug(rep) + + if 'task' in rep: + channel.basic_ack(delivery_tag=method.delivery_tag) + #print("") + reindex_task_url = "{0}/_tasks/{1}".format(cfg['reindexer']['destination_url'], rep['task']) + logging.info("Created reindex task: {0}".format(reindex_task_url)) + + # 3. create sampling task (full -> meta) + if '.full' in uuid: + self.create_sampling_task(cfg, channel, uuid)#, reindex_task_url) + logging.info("Created sampling task.") + # otherwise, remove the lock + else: + logging.info("Removing lock for dataset with uuid = %s." % uuid.replace('.meta', '')) + unlock( cfg['session']['working_directory'], uuid.replace('.meta', '')) + + + + else: + channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + #print("") + logging.error(json.dumps(rep, indent=4)) + logging.error("Failed") + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], + # session_id=cfg['session']['id'], + uuid=decoded_body['header']['cfg']['session']['current_uuid'], + step='reindexer', + status='Terminated', + uuid_prefix='meta', + info='no info' + ) + self.rabbit.publish_log(log_message=log_message.__dict__) + # ------------------------------------------------------------ + + return + + def main(self): + + with RabbitSession(self.cfg) as self.rabbit: + # ------------------------------------------------------------ + docs_to_enrich_qn = cfg['rabbitmq_queue'] + self.rabbit.consume_queue_and_launch_specific_method(specific_method=self.on_msg_callback, + specific_queue=docs_to_enrich_qn) + + + return + + +if __name__ == '__main__': + + import yaml + import time + import signal + import argparse + + signal.signal(signal.SIGINT, exit_gracefully) + + parser = argparse.ArgumentParser(description='Incremental reindexer') + parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, required=True) + parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672) + parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True) + parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True) + parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) + + args = parser.parse_args() + + cfg = dict() + cfg['rabbitmq'] = dict() + cfg['rabbitmq_host'] = args.host + cfg['rabbitmq_port'] = args.port + cfg['rabbitmq_exchange'] = args.exchange + cfg['rabbitmq_queue'] = args.queue + cfg['rabbitmq']['user'] = 'admin' + cfg['rabbitmq']['password'] = 'admin' + cfg['rabbitmq']['queue_logs_name'] = 'session_logs' + cfg['rabbitmq']['routing_key_logs'] = 'scripts_log_key' + cfg['rabbitmq']['exchange_logs_name'] = 'download_data_grandlyon_com_logs' + + logging.getLogger().setLevel(args.loglevel) + logging.info('Starting...') + + while True: + try: + Reindexer(cfg).main() + except pika.exceptions.ChannelClosed: + logging.info("Waiting for tasks...") + time.sleep(5) + except pika.exceptions.AMQPConnectionError: + logging.info('Waiting for RabbitMQ to be reachable...') + time.sleep(5) + except Exception as e: + logging.error(e) + time.sleep(5) + exit(1) diff --git a/workers/reindexer.py b/workers/reindexer.py index 8ef44395904ca1cefed8cf502b74ffcb8f234ff8..2bcfb66e3348984f41b4ff1f5b1afad2e48a280d 100644 --- a/workers/reindexer.py +++ b/workers/reindexer.py @@ -12,267 +12,251 @@ sys.path.append(newPath) from lib.my_logging import logging from lib.exit_gracefully import exit_gracefully from lib.locker import unlock - -from lib.rabbit_session import RabbitSession from lib.log_message import LogMessage class NotEmptyQueueException(Exception): pass -class Reindexer : - - def __init__(self, cfg): - self.cfg = cfg - self.rabbit = None - - def create_sampling_task(self, cfg, channel, uuid): - - # here-below we generate a task for the sample generator (full -> meta) - msg = dict() - msg['header'] = dict() - msg['header']['cfg'] = cfg - #msg['header']['reindex_task_url'] = reindex_task_url - msg['body'] = uuid - - the_task_body = msgpack.packb(msg, use_bin_type=True) - - # connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'])) - # channel = connection.channel() - exchange = cfg['rabbitmq']['exchange'] - - queue_name = cfg['rabbitmq']['queue_name_6'] - routing_key = cfg['rabbitmq']['routing_key_6'] - - # ------------------------send task----------------------------------- - self.rabbit.publish_task(the_body=the_task_body, - exchange=exchange, - routing_key=routing_key, - queue_name=queue_name) - # --------------------------------------------------------------------- +def create_sampling_task(cfg, channel, uuid): + + # here-below we generate a task for the sample generator (full -> meta) + msg = dict() + msg['header'] = dict() + msg['header']['cfg'] = cfg + #msg['header']['reindex_task_url'] = reindex_task_url + msg['body'] = uuid + + the_body = msgpack.packb(msg, use_bin_type=True) + + # connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'])) + # channel = connection.channel() + exchange = cfg['rabbitmq']['exchange'] + + queue_name = cfg['rabbitmq']['queue_name_6'] + routing_key = cfg['rabbitmq']['routing_key_6'] + queue_logs_name = cfg['rabbitmq']['queue_logs_name'] + routing_key_logs = cfg['rabbitmq']['routing_key_logs'] + exchange_logs_name = cfg['rabbitmq']['exchange_logs_name'] + + channel.exchange_declare(exchange=cfg['rabbitmq']['exchange'], exchange_type='direct') + channel.queue_declare(queue=queue_name, durable=True) + channel.queue_bind(exchange=cfg['rabbitmq']['exchange'], queue=queue_name, routing_key=routing_key) + channel.queue_declare(queue=queue_logs_name, durable=True) + channel.queue_bind(exchange=exchange_logs_name, queue=queue_logs_name, routing_key=routing_key_logs) + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='reindexer', + status='create sampling task', + uuid_prefix='full', + info=uuid + ) + + json_body = json.dumps(log_message) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=exchange_logs_name, + routing_key=routing_key_logs, + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ + + channel.basic_publish( exchange=exchange, + routing_key=routing_key, + body=the_body, + properties=pika.BasicProperties(delivery_mode = 2) + ) + + #connection.close() + + + return + +def on_msg_callback(channel, method, properties, body): + + + decoded_body = msgpack.unpackb(body, raw=False) + cfg = decoded_body['header']['cfg'] + uuid = decoded_body['body'] + count_ref = decoded_body['header']['count'] + + # from lib.elasticsearch_template import template + # template['index_patterns'] = [ cfg['reindexer']['destination_index'] ] + # template['settings']['number_of_shards'] = cfg['reindexer']['number_of_shards'] + # template['settings']['number_of_replicas'] = cfg['reindexer']['number_of_replicas'] + + if 'source_url' in cfg['reindexer'].keys(): + es_source = Elasticsearch([cfg['reindexer']['source_url']], timeout=60) + else: + es_source = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60) + + # es_logger = logging.getLogger('elasticsearch') + # es_logger.setLevel(logging.INFO) + + the_query = dict() + the_query['query'] = dict() + the_query['query']['term'] = {'uuid.keyword': '{0}'.format(uuid)} + + es_source.indices.refresh(index=cfg['reindexer']['source_index']) + count_es = es_source.count(cfg['reindexer']['source_index'], body=the_query).get('count') + # logging.debug("%i document(s) found in the source index with uuid = %s" % (count1, uuid)) + + # if uuid.endswith('.full'): + # + # logging.debug("Waiting for 5 seconds before counting again...") + # time.sleep(5) + # + # es_source.indices.refresh(index=cfg['reindexer']['source_index']) + # count2 = es_source.count(cfg['reindexer']['source_index'], body=the_query).get('count') + # logging.debug("%i document(s) found in the source index with uuid = %s" % (count2, uuid)) + # + # if count1 != count2 or count2 == 0: + # + # logging.warning('Documents are still being pushed to the source index. Waiting...') + # time.sleep(5) + # channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + # return + # #raise NotEmptyQueueException('Documents are still being pushed to the source index. Waiting...') + # + # elif uuid.endswith('.meta'): + # + # if count1 != 1: + # + # logging.warning('Documents are still being pushed to the source index. Waiting...') + # time.sleep(5) + # channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + # return + # + # else: + # channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + # logging.error("The uuid ends neither with .full nor with .meta. What shall I do?") + # return + + + if count_es != count_ref: + logging.warning('Documents are still being pushed to the source index for dataset with uuid = %s' % uuid) + logging.debug('count_es = %i; count_ref = %i' % (count_es, count_ref)) + time.sleep(5) + channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + return - # channel.exchange_declare(exchange=cfg['rabbitmq']['exchange'], exchange_type='direct') - # channel.queue_declare(queue=queue_name, durable=True) - # channel.queue_bind(exchange=cfg['rabbitmq']['exchange'], queue=queue_name, routing_key=routing_key) - # - # channel.basic_publish( exchange=exchange, - # routing_key=routing_key, - # body=the_body, - # properties=pika.BasicProperties(delivery_mode = 2) - # ) - # - #connection.close() + # 1. remove already existing docs from destination index + logging.info("Removing dataset with uuid = %s from the destination index..." % uuid) + + es = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60) + index = cfg['reindexer']['destination_index'] + + try: + es.indices.refresh(index=index) + except NotFoundError: + # the destination index may not be already present + pass + + the_query = dict() + the_query['query'] = dict() + the_query['query']['term'] = {'uuid.keyword': '{0}'.format(uuid)} + + try: + res = es.delete_by_query(index, doc_type='_doc', body=the_query) + logging.debug(res) + res = es.indices.refresh(index=index) + logging.debug(res) + except NotFoundError: + pass + except Exception as e: + logging.error(e) + channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) return - def on_msg_callback(self, channel, method, properties, body): - + # # 2. setup template + # try: + # rep = es.indices.delete_template(cfg['reindexer']['template_name']) + # logging.debug(rep) + # except: + # pass + # + # rep = es.indices.put_template(cfg['reindexer']['template_name'], template) + # # rep = es.indices.get_template("template_1") + # logging.debug(rep) + + + # 3. trigger reindexation + + body = { + "source": { + "index": cfg['reindexer']['source_index'], + "query": { + "term": {"uuid.keyword": '{0}'.format(uuid)} + }, + "type": "_doc", + "size": 1000 + }, + "dest": { + "index": cfg['reindexer']['destination_index'], + "type": "_doc" + } + } - decoded_body = msgpack.unpackb(body, raw=False) + if 'source_url' in cfg['reindexer'].keys(): + body['source']['remote'] = {'host': cfg['reindexer']['source_url']} - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], - # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], - step='reindexer', - status='Starting...', - uuid_prefix='meta', - info='no info' - ) - self.rabbit.publish_log(log_message=log_message.__dict__) - # ------------------------------------------------------------ + rep = es.reindex(body, wait_for_completion=False) - cfg = decoded_body['header']['cfg'] - uuid = decoded_body['body'] - count_ref = decoded_body['header']['count'] + logging.debug(rep) - # from lib.elasticsearch_template import template - # template['index_patterns'] = [ cfg['reindexer']['destination_index'] ] - # template['settings']['number_of_shards'] = cfg['reindexer']['number_of_shards'] - # template['settings']['number_of_replicas'] = cfg['reindexer']['number_of_replicas'] + if 'task' in rep: + channel.basic_ack(delivery_tag = method.delivery_tag) + #print("") + reindex_task_url = "{0}/_tasks/{1}".format(cfg['reindexer']['destination_url'], rep['task']) + logging.info("Created reindex task: {0}".format(reindex_task_url)) - if 'source_url' in cfg['reindexer'].keys(): - es_source = Elasticsearch([cfg['reindexer']['source_url']], timeout=60) + # 3. create sampling task (full -> meta) + if '.full' in uuid: + create_sampling_task(cfg, channel, uuid)#, reindex_task_url) + logging.info("Created sampling task.") + # otherwise, remove the lock else: - es_source = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60) - - # es_logger = logging.getLogger('elasticsearch') - # es_logger.setLevel(logging.INFO) - - the_query = dict() - the_query['query'] = dict() - the_query['query']['term'] = {'uuid.keyword': '{0}'.format(uuid)} - - es_source.indices.refresh(index=cfg['reindexer']['source_index']) - count_es = es_source.count(cfg['reindexer']['source_index'], body=the_query).get('count') - # logging.debug("%i document(s) found in the source index with uuid = %s" % (count1, uuid)) - - # if uuid.endswith('.full'): - # - # logging.debug("Waiting for 5 seconds before counting again...") - # time.sleep(5) - # - # es_source.indices.refresh(index=cfg['reindexer']['source_index']) - # count2 = es_source.count(cfg['reindexer']['source_index'], body=the_query).get('count') - # logging.debug("%i document(s) found in the source index with uuid = %s" % (count2, uuid)) - # - # if count1 != count2 or count2 == 0: - # - # logging.warning('Documents are still being pushed to the source index. Waiting...') - # time.sleep(5) - # channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - # return - # #raise NotEmptyQueueException('Documents are still being pushed to the source index. Waiting...') - # - # elif uuid.endswith('.meta'): - # - # if count1 != 1: - # - # logging.warning('Documents are still being pushed to the source index. Waiting...') - # time.sleep(5) - # channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - # return - # - # else: - # channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - # logging.error("The uuid ends neither with .full nor with .meta. What shall I do?") - # return - - - if count_es != count_ref: - logging.warning('Documents are still being pushed to the source index for dataset with uuid = %s' % uuid) - logging.debug('count_es = %i; count_ref = %i' % (count_es, count_ref)) - time.sleep(5) - channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - return + logging.info("Removing lock for dataset with uuid = %s." % uuid.replace('.meta', '')) + unlock( cfg['session']['working_directory'], uuid.replace('.meta', '')) - # 1. remove already existing docs from destination index - logging.info("Removing dataset with uuid = %s from the destination index..." % uuid) - es = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60) - index = cfg['reindexer']['destination_index'] + else: + channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + #print("") + logging.error(json.dumps(rep, indent=4)) + logging.error("Failed") - try: - es.indices.refresh(index=index) - except NotFoundError: - # the destination index may not be already present - pass - - the_query = dict() - the_query['query'] = dict() - the_query['query']['term'] = {'uuid.keyword': '{0}'.format(uuid)} - try: - res = es.delete_by_query(index, doc_type='_doc', body=the_query) - logging.debug(res) - res = es.indices.refresh(index=index) - logging.debug(res) - except NotFoundError: - pass - except Exception as e: - logging.error(e) - channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - return - - # # 2. setup template - # try: - # rep = es.indices.delete_template(cfg['reindexer']['template_name']) - # logging.debug(rep) - # except: - # pass - # - # rep = es.indices.put_template(cfg['reindexer']['template_name'], template) - # # rep = es.indices.get_template("template_1") - # logging.debug(rep) - - - # 3. trigger reindexation - - body = { - "source": { - "index": cfg['reindexer']['source_index'], - "query": { - "term": {"uuid.keyword": '{0}'.format(uuid)} - }, - "type": "_doc", - "size": 1000 - }, - "dest": { - "index": cfg['reindexer']['destination_index'], - "type": "_doc" - } - } + return - if 'source_url' in cfg['reindexer'].keys(): - body['source']['remote'] = {'host': cfg['reindexer']['source_url']} - rep = es.reindex(body, wait_for_completion=False) +def main(cfg): - logging.debug(rep) + #from lib.close_connection import on_timeout - if 'task' in rep: - channel.basic_ack(delivery_tag=method.delivery_tag) - #print("") - reindex_task_url = "{0}/_tasks/{1}".format(cfg['reindexer']['destination_url'], rep['task']) - logging.info("Created reindex task: {0}".format(reindex_task_url)) + connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'], port=cfg['rabbitmq_port'])) + #timeout = 5 + #connection.add_timeout(timeout, on_timeout(connection)) + channel = connection.channel() + exchange = cfg['rabbitmq_exchange'] + # the queue this program will consume messages from: + reindex_tasks_to_create_qn = cfg['rabbitmq_queue'] - # 3. create sampling task (full -> meta) - if '.full' in uuid: - self.create_sampling_task(cfg, channel, uuid)#, reindex_task_url) - logging.info("Created sampling task.") - # otherwise, remove the lock - else: - logging.info("Removing lock for dataset with uuid = %s." % uuid.replace('.meta', '')) - unlock( cfg['session']['working_directory'], uuid.replace('.meta', '')) + channel.basic_qos(prefetch_count=1) + channel.basic_consume(on_message_callback=lambda ch, method, properties, body: on_msg_callback(ch, method, properties, body), + queue=reindex_tasks_to_create_qn) + channel.start_consuming() + connection.close() - else: - channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - #print("") - logging.error(json.dumps(rep, indent=4)) - logging.error("Failed") - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], - # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], - step='reindexer', - status='Terminated', - uuid_prefix='meta', - info='no info' - ) - self.rabbit.publish_log(log_message=log_message.__dict__) - # ------------------------------------------------------------ - - return - - def main(self): - - with RabbitSession(self.cfg) as self.rabbit: - # ------------------------------------------------------------ - docs_to_enrich_qn = cfg['rabbitmq_queue'] - self.rabbit.consume_queue_and_launch_specific_method(specific_method=self.on_msg_callback, - specific_queue=docs_to_enrich_qn) - # #from lib.close_connection import on_timeout - # - # connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'], port=cfg['rabbitmq_port'])) - # #timeout = 5 - # #connection.add_timeout(timeout, on_timeout(connection)) - # channel = connection.channel() - # exchange = cfg['rabbitmq_exchange'] - # # the queue this program will consume messages from: - # reindex_tasks_to_create_qn = cfg['rabbitmq_queue'] - # - # channel.basic_qos(prefetch_count=1) - # channel.basic_consume(on_message_callback=lambda ch, method, properties, body: on_msg_callback(ch, method, properties, body), - # queue=reindex_tasks_to_create_qn) - # - # channel.start_consuming() - # - # connection.close() - - return + return if __name__ == '__main__': @@ -290,27 +274,25 @@ if __name__ == '__main__': parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True) parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True) parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) + parser.add_argument('--user', dest='user', help='the RabbitMQ user login', type=str, required=True) + parser.add_argument('--password', dest='password', help='the RabbitMQ user password', type=str, required=True) args = parser.parse_args() cfg = dict() - cfg['rabbitmq'] = dict() cfg['rabbitmq_host'] = args.host cfg['rabbitmq_port'] = args.port cfg['rabbitmq_exchange'] = args.exchange cfg['rabbitmq_queue'] = args.queue - cfg['rabbitmq']['user'] = 'admin' - cfg['rabbitmq']['password'] = 'admin' - cfg['rabbitmq']['queue_logs_name'] = 'session_logs' - cfg['rabbitmq']['routing_key_logs'] = 'scripts_log_key' - cfg['rabbitmq']['exchange_logs_name'] = 'download_data_grandlyon_com_logs' + cfg['rabbitmq']['user'] = args.user + cfg['rabbitmq']['password'] = args.password logging.getLogger().setLevel(args.loglevel) logging.info('Starting...') while True: try: - Reindexer(cfg).main() + main(cfg) except pika.exceptions.ChannelClosed: logging.info("Waiting for tasks...") time.sleep(5) diff --git a/workers/sample-generator-oo.py b/workers/sample-generator-oo.py new file mode 100644 index 0000000000000000000000000000000000000000..a10e22a223e0585056fc9ee008aef1314e818209 --- /dev/null +++ b/workers/sample-generator-oo.py @@ -0,0 +1,195 @@ +import pika +import msgpack +import requests +import json +import time +import os, sys +from elasticsearch import Elasticsearch +from elasticsearch.exceptions import AuthorizationException + +fileDir = os.path.dirname(os.path.abspath(__file__)) +parentDir = os.path.dirname(fileDir) +newPath = os.path.join(parentDir) +sys.path.append(newPath) +from lib.exit_gracefully import exit_gracefully +from lib.my_logging import logging +from lib.locker import unlock + +from lib.rabbit_session import RabbitSession +from lib.log_message import LogMessage + + +class Sampler: + + def __init__(self, cfg): + self.cfg = cfg + self.rabbit = None + + + def callback(self, channel, method, properties, body): + + sample_size = 10 + + decoded_body = msgpack.unpackb(body, raw=False) + + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], + # session_id=cfg['session']['id'], + uuid=decoded_body['header']['cfg']['session']['current_uuid'], + step='sampler', + status='Starting...', + uuid_prefix='meta', + info='no info' + ) + self.rabbit.publish_log(log_message=log_message.__dict__) + # ------------------------------------------------------------ + + cfg = decoded_body['header']['cfg'] + #reindex_task_url = decoded_body['header']['reindex_task_url'] + uuid = decoded_body['body'] + + # get sample records from the ingest index + source_es = Elasticsearch([cfg['reindexer']['source_url']], timeout=60) + + the_query = dict() + the_query['size'] = sample_size + the_query['query'] = dict() + the_query['query']['term'] = {'uuid.keyword': uuid} + + res = source_es.search(cfg['reindexer']['source_index'], '_doc', the_query) + + docs_to_index = [ doc['_source'] for doc in res['hits']['hits'] ] + + if len(docs_to_index) == 0: + logging.error('Zero documents found for dataset with uuid = %s: sleeping for 5 seconds...' % (uuid)) + time.sleep(5) + channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + return + + # delete the already existing samples + destin_es = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60) + destin_es.indices.refresh(index=cfg['reindexer']['destination_index']) + + the_query = dict() + the_query['query'] = dict() + the_query['query']['term'] = {'editorial-metadata.isSample': True} + the_query['query']['term'] = {'uuid.keyword': uuid.replace('.full', '.meta')} + + logging.info("Deleting already existing samples for dataset with slug = %s" % docs_to_index[0]['slug']) + try: + res = destin_es.delete_by_query(cfg['reindexer']['destination_index'], doc_type='_doc', body=the_query) + logging.debug(res) + res = destin_es.indices.refresh(index=cfg['reindexer']['destination_index']) + logging.debug(res) + except AuthorizationException: # TODO correct this "Unresolved reference for AuthorizationException" + time.sleep(5) + channel.basic_nack(delivery_tag=method.delivery_tag, requeue=1) + return + except Exception as e: + logging.error("Exception:") + logging.error(e) + logging.error("Exiting.") + exit(1) + + t1 = time.time() + + # push sample records to the destination index + es_body = '' + header = { "index" : { "_index" : cfg['reindexer']['destination_index'], "_type" : "_doc" } } + for doc in docs_to_index: + + doc['editorial-metadata']['isSample'] = True + doc['uuid'] = uuid.replace('.full', '.meta') + + es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc)) + + logging.info("Pushing {0} samples to Elasticsearch for dataset {1}...".format(len(docs_to_index), docs_to_index[0]['slug'])) + rep = destin_es.bulk(body=es_body) + + t2 = time.time() + + if rep['errors'] == False: + channel.basic_ack(delivery_tag = method.delivery_tag) + logging.info("Done in %s seconds." % (t2-t1)) + destin_es.indices.refresh(index=cfg['reindexer']['destination_index']) + logging.info("Removing lock for dataset with uuid = %s." % uuid.replace('.full', '')) + unlock( cfg['session']['working_directory'], uuid.replace('.full', '') ) + else: + channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + logging.error(json.dumps(rep, indent=4)) + logging.error("Failed") + + # else: + # + # time.sleep(5) + # channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], + # session_id=cfg['session']['id'], + uuid=decoded_body['header']['cfg']['session']['current_uuid'], + step='sampler', + status='Terminated', + uuid_prefix='meta', + info='no info' + ) + self.rabbit.publish_log(log_message=log_message.__dict__) + # ------------------------------------------------------------ + return + + def main(self): + with RabbitSession(self.cfg) as self.rabbit: + # ------------------------------------------------------------ + docs_to_enrich_qn = cfg['rabbitmq_queue'] + self.rabbit.consume_queue_and_launch_specific_method(specific_method=self.callback, + specific_queue=docs_to_enrich_qn) + + return + + +if __name__ == "__main__": + + import yaml + import time + import signal + import argparse + + signal.signal(signal.SIGINT, exit_gracefully) + + parser = argparse.ArgumentParser(description='Sample generator') + parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, required=True) + parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672) + parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True) + parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True) + parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) + + args = parser.parse_args() + + cfg = dict() + cfg['rabbitmq'] = dict() + cfg['rabbitmq_host'] = args.host + cfg['rabbitmq_port'] = args.port + cfg['rabbitmq_exchange'] = args.exchange + cfg['rabbitmq_queue'] = args.queue + cfg['rabbitmq']['user'] = 'admin' + cfg['rabbitmq']['password'] = 'admin' + cfg['rabbitmq']['queue_logs_name'] = 'session_logs' + cfg['rabbitmq']['routing_key_logs'] = 'scripts_log_key' + cfg['rabbitmq']['exchange_logs_name'] = 'download_data_grandlyon_com_logs' + + logging.getLogger().setLevel(args.loglevel) + logging.info('Starting...') + + while True: + + try: + Sampler(cfg).main() + except pika.exceptions.ChannelClosed: + logging.info("Waiting for tasks...") + time.sleep(5) + except pika.exceptions.AMQPConnectionError: + logging.info('Waiting for RabbitMQ to be reachable...') + time.sleep(5) + except Exception as e: + logging.error(e) + time.sleep(5) + exit(1) diff --git a/workers/sample-generator.py b/workers/sample-generator.py index e86135ee496b0cba964404501ff45bf4aba0b89b..b13fd14c40c8145f0366094425dbbdf477ca920b 100644 --- a/workers/sample-generator.py +++ b/workers/sample-generator.py @@ -4,8 +4,8 @@ import requests import json import time import os, sys -from elasticsearch import Elasticsearch -from elasticsearch.exceptions import AuthorizationException +from elasticsearch import Elasticsearch, AuthorizationException + fileDir = os.path.dirname(os.path.abspath(__file__)) parentDir = os.path.dirname(fileDir) @@ -14,274 +14,290 @@ sys.path.append(newPath) from lib.exit_gracefully import exit_gracefully from lib.my_logging import logging from lib.locker import unlock - -from lib.rabbit_session import RabbitSession from lib.log_message import LogMessage -class Sampler: +def old_callback(channel, method, properties, body): - def __init__(self, cfg): - self.cfg = cfg - self.rabbit = None + decoded_body = msgpack.unpackb(body, raw=False) + print(decoded_body) - # def old_callback(self, channel, method, properties, body): - # - # decoded_body = msgpack.unpackb(body, raw=False) - # print(decoded_body) - # - # reindex_task_url = decoded_body['reindex_task_url'] - # - # res = requests.get(reindex_task_url) - # data = res.json() - # - # print( json.dumps(data, indent=4) ) - # - # if data['completed'] == True: - # print(True) - # - # # delete all the already existing samples - # - # destin_es = Elasticsearch([decoded_body['dest_url']], timeout=60) - # destin_es.indices.refresh(index=decoded_body['dest_index']) - # - # the_query = dict() - # the_query['query'] = dict() - # the_query['query']['term'] = {'editorial-metadata.isSample': True} - # - # res = destin_es.delete_by_query(decoded_body['dest_index'], doc_type='_doc', body=the_query) - # - # print( json.dumps(res) ) - # - # #exit(1) - # - # # get sample records from the ingest index - # source_es = Elasticsearch([decoded_body['source_url']], timeout=60) - # - # the_query = dict() - # the_query['size'] = 0 - # # the_query['query'] = dict() - # # the_query['query']['exists'] = dict() - # # the_query['query']['exists']['field'] = "data-fr.properties.address.addressCountry" - # the_query['aggs'] = dict() - # the_query['aggs']['my-aggregation'] = dict() - # the_query['aggs']['my-aggregation']['terms'] = dict() - # the_query['aggs']['my-aggregation']['terms']['field'] = "uuid" - # the_query['aggs']['my-aggregation']['terms']['size'] = 2000 - # - # - # res = source_es.search(decoded_body['source_index'], '_doc', the_query) - # - # # print( json.dumps(res['aggregations'], indent=4) ) - # - # - # print( json.dumps(res['aggregations']['my-aggregation']['buckets'], indent=4) ) - # - # - # full_uuids = [] - # - # for bucket in res['aggregations']['my-aggregation']['buckets']: - # if '.full' in bucket['key']: - # full_uuids.append(bucket['key']) - # - # print(full_uuids) - # - # - # for uuid in full_uuids: - # print(uuid) - # - # the_query = dict() - # the_query['size'] = 10 - # the_query['query'] = dict() - # the_query['query']['match'] = {'uuid': uuid} - # - # res = source_es.search(decoded_body['source_index'], '_doc', the_query) - # print( len(res['hits']['hits']) ) - # - # docs_to_index = [ doc['_source'] for doc in res['hits']['hits'] ] - # - # t1 = time.time() - # - # # push sample records to the destination index - # es_body = '' - # header = { "index" : { "_index" : decoded_body['dest_index'], "_type" : "_doc" } } - # for doc in docs_to_index: - # # try: - # # header['index']['_id'] = doc['_id'] #hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() - # # del doc['_id'] - # # except: - # # header['index']['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() - # - # #header['index']['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() - # - # doc['editorial-metadata']['isSample'] = True - # doc['uuid'] = uuid.replace('.full', '.meta') - # - # es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc)) - # - # - # rep = destin_es.bulk(body=es_body) - # - # t2 = time.time() - # - # if rep['errors'] == False: - # channel.basic_ack(delivery_tag = method.delivery_tag) - # logging.info("Done in %s seconds." % (t2-t1)) - # destin_es.indices.refresh(index=decoded_body['dest_index']) - # else: - # channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - # logging.error(json.dumps(rep, indent=4)) - # logging.error("Failed") - # - # else: - # - # time.sleep(5) - # channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - # - # return - # + reindex_task_url = decoded_body['reindex_task_url'] + + res = requests.get(reindex_task_url) + data = res.json() - def callback(self, channel, method, properties, body): + print( json.dumps(data, indent=4) ) - sample_size = 10 + if data['completed'] == True: + print(True) - decoded_body = msgpack.unpackb(body, raw=False) + # delete all the already existing samples - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], - # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], - step='sampler', - status='Starting...', - uuid_prefix='meta', - info='no info' - ) - self.rabbit.publish_log(log_message=log_message.__dict__) - # ------------------------------------------------------------ + destin_es = Elasticsearch([decoded_body['dest_url']], timeout=60) + destin_es.indices.refresh(index=decoded_body['dest_index']) - cfg = decoded_body['header']['cfg'] - #reindex_task_url = decoded_body['header']['reindex_task_url'] - uuid = decoded_body['body'] + the_query = dict() + the_query['query'] = dict() + the_query['query']['term'] = {'editorial-metadata.isSample': True} + + res = destin_es.delete_by_query(decoded_body['dest_index'], doc_type='_doc', body=the_query) + print( json.dumps(res) ) - #res = requests.get(reindex_task_url) - #data = res.json() + #exit(1) # get sample records from the ingest index - source_es = Elasticsearch([cfg['reindexer']['source_url']], timeout=60) + source_es = Elasticsearch([decoded_body['source_url']], timeout=60) the_query = dict() - the_query['size'] = sample_size - the_query['query'] = dict() - the_query['query']['term'] = {'uuid.keyword': uuid} + the_query['size'] = 0 + # the_query['query'] = dict() + # the_query['query']['exists'] = dict() + # the_query['query']['exists']['field'] = "data-fr.properties.address.addressCountry" + the_query['aggs'] = dict() + the_query['aggs']['my-aggregation'] = dict() + the_query['aggs']['my-aggregation']['terms'] = dict() + the_query['aggs']['my-aggregation']['terms']['field'] = "uuid" + the_query['aggs']['my-aggregation']['terms']['size'] = 2000 - res = source_es.search(cfg['reindexer']['source_index'], '_doc', the_query) - docs_to_index = [ doc['_source'] for doc in res['hits']['hits'] ] + res = source_es.search(decoded_body['source_index'], '_doc', the_query) - if len(docs_to_index) == 0: - logging.error('Zero documents found for dataset with uuid = %s: sleeping for 5 seconds...' % (uuid)) - time.sleep(5) - channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - return + # print( json.dumps(res['aggregations'], indent=4) ) - # delete the already existing samples - destin_es = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60) - destin_es.indices.refresh(index=cfg['reindexer']['destination_index']) - the_query = dict() - the_query['query'] = dict() - the_query['query']['term'] = {'editorial-metadata.isSample': True} - the_query['query']['term'] = {'uuid.keyword': uuid.replace('.full', '.meta')} + print( json.dumps(res['aggregations']['my-aggregation']['buckets'], indent=4) ) - logging.info("Deleting already existing samples for dataset with slug = %s" % docs_to_index[0]['slug']) - try: - res = destin_es.delete_by_query(cfg['reindexer']['destination_index'], doc_type='_doc', body=the_query) - logging.debug(res) - res = destin_es.indices.refresh(index=cfg['reindexer']['destination_index']) - logging.debug(res) - except AuthorizationException: # TODO correct this "Unresolved reference for AuthorizationException" - time.sleep(5) - channel.basic_nack(delivery_tag=method.delivery_tag, requeue=1) - return - except Exception as e: - logging.error("Exception:") - logging.error(e) - logging.error("Exiting.") - exit(1) - t1 = time.time() - - # push sample records to the destination index - es_body = '' - header = { "index" : { "_index" : cfg['reindexer']['destination_index'], "_type" : "_doc" } } - for doc in docs_to_index: - - doc['editorial-metadata']['isSample'] = True - doc['uuid'] = uuid.replace('.full', '.meta') - - es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc)) - - logging.info("Pushing {0} samples to Elasticsearch for dataset {1}...".format(len(docs_to_index), docs_to_index[0]['slug'])) - rep = destin_es.bulk(body=es_body) - - t2 = time.time() - - if rep['errors'] == False: - channel.basic_ack(delivery_tag = method.delivery_tag) - logging.info("Done in %s seconds." % (t2-t1)) - destin_es.indices.refresh(index=cfg['reindexer']['destination_index']) - logging.info("Removing lock for dataset with uuid = %s." % uuid.replace('.full', '')) - unlock( cfg['session']['working_directory'], uuid.replace('.full', '') ) - else: - channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - logging.error(json.dumps(rep, indent=4)) - logging.error("Failed") - - # else: - # - # time.sleep(5) - # channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], - # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], - step='sampler', - status='Terminated', - uuid_prefix='meta', - info='no info' - ) - self.rabbit.publish_log(log_message=log_message.__dict__) - # ------------------------------------------------------------ + full_uuids = [] + + for bucket in res['aggregations']['my-aggregation']['buckets']: + if '.full' in bucket['key']: + full_uuids.append(bucket['key']) + + print(full_uuids) + + + for uuid in full_uuids: + print(uuid) + + the_query = dict() + the_query['size'] = 10 + the_query['query'] = dict() + the_query['query']['match'] = {'uuid': uuid} + + res = source_es.search(decoded_body['source_index'], '_doc', the_query) + print( len(res['hits']['hits']) ) + + docs_to_index = [ doc['_source'] for doc in res['hits']['hits'] ] + + t1 = time.time() + + # push sample records to the destination index + es_body = '' + header = { "index" : { "_index" : decoded_body['dest_index'], "_type" : "_doc" } } + for doc in docs_to_index: + # try: + # header['index']['_id'] = doc['_id'] #hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() + # del doc['_id'] + # except: + # header['index']['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() + + #header['index']['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() + + doc['editorial-metadata']['isSample'] = True + doc['uuid'] = uuid.replace('.full', '.meta') + + es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc)) + + + rep = destin_es.bulk(body=es_body) + + t2 = time.time() + + if rep['errors'] == False: + channel.basic_ack(delivery_tag = method.delivery_tag) + logging.info("Done in %s seconds." % (t2-t1)) + destin_es.indices.refresh(index=decoded_body['dest_index']) + else: + channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + logging.error(json.dumps(rep, indent=4)) + logging.error("Failed") + + else: + + time.sleep(5) + channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + + return + + +def callback(channel, method, properties, body): + + sample_size = 10 + + decoded_body = msgpack.unpackb(body, raw=False) + + cfg = decoded_body['header']['cfg'] + #reindex_task_url = decoded_body['header']['reindex_task_url'] + uuid = decoded_body['body'] + queue_logs_name = cfg['rabbitmq']['queue_logs_name'] + routing_key_logs = cfg['rabbitmq']['routing_key_logs'] + exchange_logs_name = cfg['rabbitmq']['exchange_logs_name'] + channel.queue_declare(queue=queue_logs_name, durable=True) + channel.queue_bind(exchange=exchange_logs_name, queue=queue_logs_name, routing_key=routing_key_logs) + + + #res = requests.get(reindex_task_url) + #data = res.json() + + # get sample records from the ingest index + source_es = Elasticsearch([cfg['reindexer']['source_url']], timeout=60) + + the_query = dict() + the_query['size'] = sample_size + the_query['query'] = dict() + the_query['query']['term'] = {'uuid.keyword': uuid} + + res = source_es.search(cfg['reindexer']['source_index'], '_doc', the_query) + + docs_to_index = [ doc['_source'] for doc in res['hits']['hits'] ] + + if len(docs_to_index) == 0: + logging.error('Zero documents found for dataset with uuid = %s: sleeping for 5 seconds...' % (uuid)) + time.sleep(5) + channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) return - def main(self): - with RabbitSession(self.cfg) as self.rabbit: - # ------------------------------------------------------------ - docs_to_enrich_qn = cfg['rabbitmq_queue'] - self.rabbit.consume_queue_and_launch_specific_method(specific_method=self.callback, - specific_queue=docs_to_enrich_qn) - # # es_logger = logging.getLogger('elasticsearch') - # # es_logger.setLevel(logging.INFO) - # - # #from lib.close_connection import on_timeout - # - # connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'], port=cfg['rabbitmq_port'])) - # #timeout = 5 - # #connection.add_timeout(timeout, on_timeout(connection)) - # - # channel = connection.channel() - # # exchange = cfg['rabbitmq_exchange'] - # # the queue this program will consume messages from: - # sampling_tasks_qn = cfg['rabbitmq_queue'] - # - # channel.basic_qos(prefetch_count=1) - # channel.basic_consume(on_message_callback=lambda ch, method, properties, body: callback(ch, method, properties, body), - # queue=sampling_tasks_qn) - # channel.start_consuming() - # connection.close() - # + # delete the already existing samples + destin_es = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60) + destin_es.indices.refresh(index=cfg['reindexer']['destination_index']) + + the_query = dict() + the_query['query'] = dict() + the_query['query']['term'] = {'editorial-metadata.isSample': True} + the_query['query']['term'] = {'uuid.keyword': uuid.replace('.full', '.meta')} + + logging.info("Deleting already existing samples for dataset with slug = %s" % docs_to_index[0]['slug']) + + # ---------------------- send log ---------------------------- + message = "Deleting already existing samples for dataset with slug =" + str(docs_to_index[0]['slug']) + log_message = LogMessage(session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='sampler', + status=message, + uuid_prefix='full', + info=uuid + ) + + json_body = json.dumps(log_message) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=exchange_logs_name, + routing_key=routing_key_logs, + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ + try: + res = destin_es.delete_by_query(cfg['reindexer']['destination_index'], doc_type='_doc', body=the_query) + logging.debug(res) + res = destin_es.indices.refresh(index=cfg['reindexer']['destination_index']) + logging.debug(res) + except AuthorizationException: + time.sleep(5) + channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) return + except Exception as e: + logging.error("Exception:") + logging.error(e) + logging.error("Exiting.") + exit(1) + + + t1 = time.time() + + # push sample records to the destination index + es_body = '' + header = { "index" : { "_index" : cfg['reindexer']['destination_index'], "_type" : "_doc" } } + for doc in docs_to_index: + + doc['editorial-metadata']['isSample'] = True + doc['uuid'] = uuid.replace('.full', '.meta') + + es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc)) + + logging.info("Pushing {0} samples to Elasticsearch for dataset {1}...".format(len(docs_to_index), docs_to_index[0]['slug'])) + + # ---------------------- send log ---------------------------- + message = "Pushing " + str(len(docs_to_index)) + " samples to Elasticsearch for dataset" + str(docs_to_index[0]['slug']) + log_message = LogMessage(session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='sampler', + status=message, + uuid_prefix='full', + info=uuid + ) + + json_body = json.dumps(log_message) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=exchange_logs_name, + routing_key=routing_key_logs, + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ + + rep = destin_es.bulk(body=es_body) + + t2 = time.time() + + if rep['errors'] == False: + channel.basic_ack(delivery_tag = method.delivery_tag) + logging.info("Done in %s seconds." % (t2-t1)) + destin_es.indices.refresh(index=cfg['reindexer']['destination_index']) + logging.info("Removing lock for dataset with uuid = %s." % uuid.replace('.full', '')) + unlock( cfg['session']['working_directory'], uuid.replace('.full', '') ) + else: + channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + logging.error(json.dumps(rep, indent=4)) + logging.error("Failed") + + # else: + # + # time.sleep(5) + # channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + + return + + +def main(cfg): + + # es_logger = logging.getLogger('elasticsearch') + # es_logger.setLevel(logging.INFO) + + #from lib.close_connection import on_timeout + + connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'], port=cfg['rabbitmq_port'])) + #timeout = 5 + #connection.add_timeout(timeout, on_timeout(connection)) + + channel = connection.channel() + # exchange = cfg['rabbitmq_exchange'] + # the queue this program will consume messages from: + sampling_tasks_qn = cfg['rabbitmq_queue'] + + channel.basic_qos(prefetch_count=1) + channel.basic_consume(on_message_callback=lambda ch, method, properties, body: callback(ch, method, properties, body), + queue=sampling_tasks_qn) + channel.start_consuming() + connection.close() + + return if __name__ == "__main__": @@ -299,20 +315,18 @@ if __name__ == "__main__": parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True) parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True) parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) + parser.add_argument('--user', dest='user', help='the RabbitMQ user login', type=str, required=True) + parser.add_argument('--password', dest='password', help='the RabbitMQ user password', type=str, required=True) args = parser.parse_args() cfg = dict() - cfg['rabbitmq'] = dict() cfg['rabbitmq_host'] = args.host cfg['rabbitmq_port'] = args.port cfg['rabbitmq_exchange'] = args.exchange cfg['rabbitmq_queue'] = args.queue - cfg['rabbitmq']['user'] = 'admin' - cfg['rabbitmq']['password'] = 'admin' - cfg['rabbitmq']['queue_logs_name'] = 'session_logs' - cfg['rabbitmq']['routing_key_logs'] = 'scripts_log_key' - cfg['rabbitmq']['exchange_logs_name'] = 'download_data_grandlyon_com_logs' + cfg['rabbitmq']['user'] = args.user + cfg['rabbitmq']['password'] = args.password logging.getLogger().setLevel(args.loglevel) logging.info('Starting...') @@ -320,7 +334,7 @@ if __name__ == "__main__": while True: try: - Sampler(cfg).main() + main(cfg) except pika.exceptions.ChannelClosed: logging.info("Waiting for tasks...") time.sleep(5)