diff --git a/8-reindexer.py b/8-reindexer.py index 5c5388c7e2c647ab88a0c6500127af72c83b263f..d9234d511c6c08665476c86d36616eff5224efb7 100644 --- a/8-reindexer.py +++ b/8-reindexer.py @@ -15,7 +15,7 @@ def main(cfg, wait=True): template['index_patterns'] = [ cfg['reindexer']['destination_index'] ] template['settings']['number_of_shards'] = cfg['reindexer']['number_of_shards'] template['settings']['number_of_replicas'] = cfg['reindexer']['number_of_replicas'] - + if wait: if 'source_url' in cfg['reindexer'].keys(): @@ -82,6 +82,37 @@ def main(cfg, wait=True): res = mongo_collection.find_one_and_update({'_id': cfg['session']['id']}, {'$set': {'reindex_task': reindex_task_url}}, upsert=True) + # ----------------------------------------------------------------------- + + msg = { + 'reindex_task_url': reindex_task_url, + 'source_url': cfg['reindexer']['source_url'], + 'dest_url': cfg['reindexer']['destination_url'], + 'source_index': cfg['reindexer']['source_index'], + 'dest_index': cfg['reindexer']['destination_index'] + } + + the_body = msgpack.packb(msg, use_bin_type=True) + + connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'])) + channel = connection.channel() + exchange = cfg['rabbitmq']['exchange'] + + queue_name = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_6_suffix'] + routing_key = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_6_suffix'] + + channel.exchange_declare(exchange=cfg['rabbitmq']['exchange'], exchange_type='direct') + channel.queue_declare(queue=queue_name, durable=True) + channel.queue_bind(exchange=cfg['rabbitmq']['exchange'], queue=queue_name, routing_key=routing_key) + + channel.basic_publish( exchange=exchange, + routing_key=routing_key, + body=the_body, + properties=pika.BasicProperties(delivery_mode = 2) + ) + + + if __name__ == '__main__': diff --git a/9-sample-generator.py b/9-sample-generator.py new file mode 100644 index 0000000000000000000000000000000000000000..3b78ea50900879c936d17601d7aecbfdc8b3ea20 --- /dev/null +++ b/9-sample-generator.py @@ -0,0 +1,170 @@ +import pika +import msgpack +import requests +import json +from elasticsearch import Elasticsearch +from utils.exit_gracefully import exit_gracefully +from utils.my_logging import logging + + +def do_something(channel, method, properties, body): + + decoded_body = msgpack.unpackb(body, raw=False) + print(decoded_body) + + reindex_task_url = decoded_body['reindex_task_url'] + + res = requests.get(reindex_task_url) + data = res.json() + + print( json.dumps(data, indent=4) ) + + if data['completed'] == True: + print(True) + + # delete all the already existing samples + + destin_es = Elasticsearch([decoded_body['dest_url']], timeout=60) + destin_es.indices.refresh(index=decoded_body['dest_index']) + + the_query = dict() + the_query['query'] = dict() + the_query['query']['term'] = {'editorial-metadata.isSample': True} + + res = destin_es.delete_by_query(decoded_body['dest_index'], doc_type='_doc', body=the_query) + + print( json.dumps(res) ) + + #exit(1) + + # get sample records from the ingest index + source_es = Elasticsearch([decoded_body['source_url']], timeout=60) + + the_query = dict() + the_query['size'] = 0 + # the_query['query'] = dict() + # the_query['query']['exists'] = dict() + # the_query['query']['exists']['field'] = "data-fr.properties.address.addressCountry" + the_query['aggs'] = dict() + the_query['aggs']['my-aggregation'] = dict() + the_query['aggs']['my-aggregation']['terms'] = dict() + the_query['aggs']['my-aggregation']['terms']['field'] = "uuid" + the_query['aggs']['my-aggregation']['terms']['size'] = 2000 + + + res = source_es.search(decoded_body['source_index'], '_doc', the_query) + + # print( json.dumps(res['aggregations'], indent=4) ) + + + print( json.dumps(res['aggregations']['my-aggregation']['buckets'], indent=4) ) + + + full_uuids = [] + + for bucket in res['aggregations']['my-aggregation']['buckets']: + if '.full' in bucket['key']: + full_uuids.append(bucket['key']) + + print(full_uuids) + + + for uuid in full_uuids: + print(uuid) + + the_query = dict() + the_query['size'] = 10 + the_query['query'] = dict() + the_query['query']['match'] = {'uuid': uuid} + + res = source_es.search(decoded_body['source_index'], '_doc', the_query) + print( len(res['hits']['hits']) ) + + docs_to_index = [ doc['_source'] for doc in res['hits']['hits'] ] + + t1 = time.time() + + # push sample records to the destination index + es_body = '' + header = { "index" : { "_index" : decoded_body['dest_index'], "_type" : "_doc" } } + for doc in docs_to_index: + # try: + # header['index']['_id'] = doc['_id'] #hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() + # del doc['_id'] + # except: + # header['index']['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() + + #header['index']['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() + + doc['editorial-metadata']['isSample'] = True + doc['uuid'] = uuid.replace('.full', '.meta') + + es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc)) + + + rep = destin_es.bulk(body=es_body) + + t2 = time.time() + + if rep['errors'] == False: + channel.basic_ack(delivery_tag = method.delivery_tag) + #print("") + logging.info("Done in %s seconds." % (t2-t1)) + else: + channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + #print("") + logging.error(json.dumps(rep, indent=4)) + logging.error("Failed") + + else: + + time.sleep(5) + channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + + return + +def main(cfg): + + from utils.close_connection import on_timeout + + connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'])) + timeout = 5 + connection.add_timeout(timeout, on_timeout(connection)) + + channel = connection.channel() + exchange = cfg['rabbitmq']['exchange'] + # the queue this program will consume messages from: + reindex_tasks_qn = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_6_suffix'] + + channel.basic_qos(prefetch_count=1) + channel.basic_consume(lambda ch, method, properties, body: + do_something(ch, method, properties, body), + queue=reindex_tasks_qn)#, no_ack=True) + channel.start_consuming() + connection.close() + + + return + + +if __name__ == "__main__": + + import yaml + import time + import signal + + signal.signal(signal.SIGINT, exit_gracefully) + + with open("config.yaml", 'r') as yamlfile: + cfg = yaml.load(yamlfile) + + while True: + + try: + main(cfg) + except pika.exceptions.ChannelClosed: + logging.info("Waiting for tasks...") + time.sleep(5) + except Exception as e: + logging.error(e) + exit(1)