Skip to content
Snippets Groups Projects
Commit 77f6cb6c authored by ddamiron's avatar ddamiron
Browse files

update reindexer with publish logs to mongo

update sampler with publish logs to mongo
parent 77ace0ac
Branches
No related tags found
No related merge requests found
import time
import json
import msgpack
import pika
import os, sys
from elasticsearch import Elasticsearch, NotFoundError
fileDir = os.path.dirname(os.path.abspath(__file__))
parentDir = os.path.dirname(fileDir)
newPath = os.path.join(parentDir)
sys.path.append(newPath)
from lib.my_logging import logging
from lib.exit_gracefully import exit_gracefully
from lib.locker import unlock
from lib.rabbit_session import RabbitSession
from lib.log_message import LogMessage
class NotEmptyQueueException(Exception):
pass
class Reindexer :
def __init__(self, cfg):
self.cfg = cfg
self.rabbit = None
def create_sampling_task(self, cfg, channel, uuid):
# here-below we generate a task for the sample generator (full -> meta)
msg = dict()
msg['header'] = dict()
msg['header']['cfg'] = cfg
#msg['header']['reindex_task_url'] = reindex_task_url
msg['body'] = uuid
the_task_body = msgpack.packb(msg, use_bin_type=True)
exchange = cfg['rabbitmq']['exchange']
queue_name = cfg['rabbitmq']['queue_name_6']
routing_key = cfg['rabbitmq']['routing_key_6']
# ------------------------send task-----------------------------------
self.rabbit.publish_task(the_body=the_task_body,
exchange=exchange,
routing_key=routing_key,
queue_name=queue_name)
# ---------------------------------------------------------------------
return
def on_msg_callback(self, channel, method, properties, body):
decoded_body = msgpack.unpackb(body, raw=False)
# ---------------------- send log ----------------------------
log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'],
# session_id=cfg['session']['id'],
uuid=decoded_body['header']['cfg']['session']['current_uuid'],
step='reindexer',
status='Starting...',
uuid_prefix='meta',
info='no info'
)
self.rabbit.publish_log(log_message=log_message.__dict__)
# ------------------------------------------------------------
cfg = decoded_body['header']['cfg']
uuid = decoded_body['body']
count_ref = decoded_body['header']['count']
if 'source_url' in cfg['reindexer'].keys():
es_source = Elasticsearch([cfg['reindexer']['source_url']], timeout=60)
else:
es_source = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60)
the_query = dict()
the_query['query'] = dict()
the_query['query']['term'] = {'uuid.keyword': '{0}'.format(uuid)}
es_source.indices.refresh(index=cfg['reindexer']['source_index'])
count_es = es_source.count(cfg['reindexer']['source_index'], body=the_query).get('count')
if count_es != count_ref:
logging.warning('Documents are still being pushed to the source index for dataset with uuid = %s' % uuid)
logging.debug('count_es = %i; count_ref = %i' % (count_es, count_ref))
time.sleep(5)
channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
return
# 1. remove already existing docs from destination index
logging.info("Removing dataset with uuid = %s from the destination index..." % uuid)
es = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60)
index = cfg['reindexer']['destination_index']
try:
es.indices.refresh(index=index)
except NotFoundError:
# the destination index may not be already present
pass
the_query = dict()
the_query['query'] = dict()
the_query['query']['term'] = {'uuid.keyword': '{0}'.format(uuid)}
try:
res = es.delete_by_query(index, doc_type='_doc', body=the_query)
logging.debug(res)
res = es.indices.refresh(index=index)
logging.debug(res)
except NotFoundError:
pass
except Exception as e:
logging.error(e)
channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
return
# 3. trigger reindexation
body = {
"source": {
"index": cfg['reindexer']['source_index'],
"query": {
"term": {"uuid.keyword": '{0}'.format(uuid)}
},
"type": "_doc",
"size": 1000
},
"dest": {
"index": cfg['reindexer']['destination_index'],
"type": "_doc"
}
}
if 'source_url' in cfg['reindexer'].keys():
body['source']['remote'] = {'host': cfg['reindexer']['source_url']}
rep = es.reindex(body, wait_for_completion=False)
logging.debug(rep)
if 'task' in rep:
channel.basic_ack(delivery_tag=method.delivery_tag)
#print("")
reindex_task_url = "{0}/_tasks/{1}".format(cfg['reindexer']['destination_url'], rep['task'])
logging.info("Created reindex task: {0}".format(reindex_task_url))
# 3. create sampling task (full -> meta)
if '.full' in uuid:
self.create_sampling_task(cfg, channel, uuid)#, reindex_task_url)
logging.info("Created sampling task.")
# otherwise, remove the lock
else:
logging.info("Removing lock for dataset with uuid = %s." % uuid.replace('.meta', ''))
unlock( cfg['session']['working_directory'], uuid.replace('.meta', ''))
else:
channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
#print("")
logging.error(json.dumps(rep, indent=4))
logging.error("Failed")
# ---------------------- send log ----------------------------
log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'],
# session_id=cfg['session']['id'],
uuid=decoded_body['header']['cfg']['session']['current_uuid'],
step='reindexer',
status='Terminated',
uuid_prefix='meta',
info='no info'
)
self.rabbit.publish_log(log_message=log_message.__dict__)
# ------------------------------------------------------------
return
def main(self):
with RabbitSession(self.cfg) as self.rabbit:
# ------------------------------------------------------------
docs_to_enrich_qn = cfg['rabbitmq_queue']
self.rabbit.consume_queue_and_launch_specific_method(specific_method=self.on_msg_callback,
specific_queue=docs_to_enrich_qn)
return
if __name__ == '__main__':
import yaml
import time
import signal
import argparse
signal.signal(signal.SIGINT, exit_gracefully)
parser = argparse.ArgumentParser(description='Incremental reindexer')
parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, required=True)
parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672)
parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True)
parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True)
parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR'])
args = parser.parse_args()
cfg = dict()
cfg['rabbitmq'] = dict()
cfg['rabbitmq_host'] = args.host
cfg['rabbitmq_port'] = args.port
cfg['rabbitmq_exchange'] = args.exchange
cfg['rabbitmq_queue'] = args.queue
cfg['rabbitmq']['user'] = 'admin'
cfg['rabbitmq']['password'] = 'admin'
cfg['rabbitmq']['queue_logs_name'] = 'session_logs'
cfg['rabbitmq']['routing_key_logs'] = 'scripts_log_key'
cfg['rabbitmq']['exchange_logs_name'] = 'download_data_grandlyon_com_logs'
logging.getLogger().setLevel(args.loglevel)
logging.info('Starting...')
while True:
try:
Reindexer(cfg).main()
except pika.exceptions.ChannelClosed:
logging.info("Waiting for tasks...")
time.sleep(5)
except pika.exceptions.AMQPConnectionError:
logging.info('Waiting for RabbitMQ to be reachable...')
time.sleep(5)
except Exception as e:
logging.error(e)
time.sleep(5)
exit(1)
This diff is collapsed.
import pika
import msgpack
import requests
import json
import time
import os, sys
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import AuthorizationException
fileDir = os.path.dirname(os.path.abspath(__file__))
parentDir = os.path.dirname(fileDir)
newPath = os.path.join(parentDir)
sys.path.append(newPath)
from lib.exit_gracefully import exit_gracefully
from lib.my_logging import logging
from lib.locker import unlock
from lib.rabbit_session import RabbitSession
from lib.log_message import LogMessage
class Sampler:
def __init__(self, cfg):
self.cfg = cfg
self.rabbit = None
def callback(self, channel, method, properties, body):
sample_size = 10
decoded_body = msgpack.unpackb(body, raw=False)
# ---------------------- send log ----------------------------
log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'],
# session_id=cfg['session']['id'],
uuid=decoded_body['header']['cfg']['session']['current_uuid'],
step='sampler',
status='Starting...',
uuid_prefix='meta',
info='no info'
)
self.rabbit.publish_log(log_message=log_message.__dict__)
# ------------------------------------------------------------
cfg = decoded_body['header']['cfg']
#reindex_task_url = decoded_body['header']['reindex_task_url']
uuid = decoded_body['body']
# get sample records from the ingest index
source_es = Elasticsearch([cfg['reindexer']['source_url']], timeout=60)
the_query = dict()
the_query['size'] = sample_size
the_query['query'] = dict()
the_query['query']['term'] = {'uuid.keyword': uuid}
res = source_es.search(cfg['reindexer']['source_index'], '_doc', the_query)
docs_to_index = [ doc['_source'] for doc in res['hits']['hits'] ]
if len(docs_to_index) == 0:
logging.error('Zero documents found for dataset with uuid = %s: sleeping for 5 seconds...' % (uuid))
time.sleep(5)
channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
return
# delete the already existing samples
destin_es = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60)
destin_es.indices.refresh(index=cfg['reindexer']['destination_index'])
the_query = dict()
the_query['query'] = dict()
the_query['query']['term'] = {'editorial-metadata.isSample': True}
the_query['query']['term'] = {'uuid.keyword': uuid.replace('.full', '.meta')}
logging.info("Deleting already existing samples for dataset with slug = %s" % docs_to_index[0]['slug'])
try:
res = destin_es.delete_by_query(cfg['reindexer']['destination_index'], doc_type='_doc', body=the_query)
logging.debug(res)
res = destin_es.indices.refresh(index=cfg['reindexer']['destination_index'])
logging.debug(res)
except AuthorizationException: # TODO correct this "Unresolved reference for AuthorizationException"
time.sleep(5)
channel.basic_nack(delivery_tag=method.delivery_tag, requeue=1)
return
except Exception as e:
logging.error("Exception:")
logging.error(e)
logging.error("Exiting.")
exit(1)
t1 = time.time()
# push sample records to the destination index
es_body = ''
header = { "index" : { "_index" : cfg['reindexer']['destination_index'], "_type" : "_doc" } }
for doc in docs_to_index:
doc['editorial-metadata']['isSample'] = True
doc['uuid'] = uuid.replace('.full', '.meta')
es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc))
logging.info("Pushing {0} samples to Elasticsearch for dataset {1}...".format(len(docs_to_index), docs_to_index[0]['slug']))
rep = destin_es.bulk(body=es_body)
t2 = time.time()
if rep['errors'] == False:
channel.basic_ack(delivery_tag = method.delivery_tag)
logging.info("Done in %s seconds." % (t2-t1))
destin_es.indices.refresh(index=cfg['reindexer']['destination_index'])
logging.info("Removing lock for dataset with uuid = %s." % uuid.replace('.full', ''))
unlock( cfg['session']['working_directory'], uuid.replace('.full', '') )
else:
channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
logging.error(json.dumps(rep, indent=4))
logging.error("Failed")
# else:
#
# time.sleep(5)
# channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
# ---------------------- send log ----------------------------
log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'],
# session_id=cfg['session']['id'],
uuid=decoded_body['header']['cfg']['session']['current_uuid'],
step='sampler',
status='Terminated',
uuid_prefix='meta',
info='no info'
)
self.rabbit.publish_log(log_message=log_message.__dict__)
# ------------------------------------------------------------
return
def main(self):
with RabbitSession(self.cfg) as self.rabbit:
# ------------------------------------------------------------
docs_to_enrich_qn = cfg['rabbitmq_queue']
self.rabbit.consume_queue_and_launch_specific_method(specific_method=self.callback,
specific_queue=docs_to_enrich_qn)
return
if __name__ == "__main__":
import yaml
import time
import signal
import argparse
signal.signal(signal.SIGINT, exit_gracefully)
parser = argparse.ArgumentParser(description='Sample generator')
parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, required=True)
parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672)
parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True)
parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True)
parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR'])
args = parser.parse_args()
cfg = dict()
cfg['rabbitmq'] = dict()
cfg['rabbitmq_host'] = args.host
cfg['rabbitmq_port'] = args.port
cfg['rabbitmq_exchange'] = args.exchange
cfg['rabbitmq_queue'] = args.queue
cfg['rabbitmq']['user'] = 'admin'
cfg['rabbitmq']['password'] = 'admin'
cfg['rabbitmq']['queue_logs_name'] = 'session_logs'
cfg['rabbitmq']['routing_key_logs'] = 'scripts_log_key'
cfg['rabbitmq']['exchange_logs_name'] = 'download_data_grandlyon_com_logs'
logging.getLogger().setLevel(args.loglevel)
logging.info('Starting...')
while True:
try:
Sampler(cfg).main()
except pika.exceptions.ChannelClosed:
logging.info("Waiting for tasks...")
time.sleep(5)
except pika.exceptions.AMQPConnectionError:
logging.info('Waiting for RabbitMQ to be reachable...')
time.sleep(5)
except Exception as e:
logging.error(e)
time.sleep(5)
exit(1)
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment