import pika import msgpack import time import json from elasticsearch import Elasticsearch from elasticsearch.exceptions import AuthorizationException import hashlib from utils.exit_gracefully import exit_gracefully from utils.my_logging import logging from pprint import pprint def tag_doc( the_doc ): # tag_list = ['isOpenAccess', 'isRealTime', 'isQueryable', 'isSearchable', 'isPunctual', 'isLinear', 'isAreal'] tag_dict = {} # initialisation #for tag in tag_list: # tag_dict[tag] = False # isOpen? if not 'legalConstraints' in the_doc['metadata-fr'].keys() or not any( [x in the_doc['metadata-fr']['legalConstraints'] for x in ['Licence Associée', 'Licence Engagée'] ] ): tag_dict['isOpenAccess'] = True else: tag_dict['isOpenAccess'] = False # isRealTime? if 'updateFrequency' in the_doc['metadata-fr'].keys() and 'continual' in the_doc['metadata-fr']['updateFrequency']: tag_dict['isRealTime'] = True else: tag_dict['isRealTime'] = False # isQueryable? tag_dict['isQueryable'] = False # default if 'link' in the_doc['metadata-fr'].keys(): for link in the_doc['metadata-fr']['link']: #print(link) if 'service' in link.keys() and any( [x in link['service'] for x in ['WFS', 'WMS', 'KML', 'WS']] ): tag_dict['isQueryable'] = True break # N.B.: in order to determine the following tags, we need the data-fr field; # in case the data-fr field is absent, the tags 'isSearchable', # 'isPunctual', 'isLinear', 'isAreal' will be absent instead of being 'falsely' set to false! # isSearchable? tag_dict['isSearchable'] = False # default if 'data-fr' in the_doc.keys(): tag_dict['isSearchable'] = True if 'geometry' in the_doc['data-fr'].keys(): # init tag_dict['isPunctual'] = False tag_dict['isLinear'] = False tag_dict['isAreal'] = False # isPunctual? if any( [x in the_doc['data-fr']['geometry']['type'] for x in ['Point', 'MultiPoint']] ): tag_dict['isPunctual'] = True # isLinear? if any( [x in the_doc['data-fr']['geometry']['type'] for x in ['LineString', 'MultiLineString']] ): tag_dict['isLinear'] = True # isAreal? if any( [x in the_doc['data-fr']['geometry']['type'] for x in ['Polygon', 'MultiPolygon']] ): tag_dict['isAreal'] = True # isSample? docs that are tagged by this script are never just a sample tag_dict['isSample'] = False tagged_doc = {'editorial-metadata': tag_dict, **the_doc} return tagged_doc def index_docs(channel, method, properties, body, es): t1 = time.time() decoded_body = msgpack.unpackb(body, raw=False) if type(decoded_body['body']) is list: #docs_to_index = decoded_body['body'] docs_to_index = [tag_doc(doc) for doc in decoded_body['body']] else: #docs_to_index = [decoded_body['body']] docs_to_index = [tag_doc(decoded_body['body'])] #print(docs_to_index) # es_index = decoded_body['header']['index']['_index'] # es_body = { "settings" : { # "number_of_shards" : 1, # "number_of_replicas" : 0, # "index.mapping.total_fields.limit": 10000, # "refresh_interval": "30s" # }, # "mappings": { # "_doc": { # "dynamic_templates": [ # priority is given by order! # { # "uuid" : { # "path_match": "uuid", # "mapping": { # "type": "keyword", # } # } # }, # { # "default" : { # "path_match": "*", # "mapping": { # "enabled": "false" # } # } # } # ] # } # } # } # #es_body.update({"mappings": {"_doc": {"dynamic_date_formats": ["strict_date_optional_time"]}}}) # # try: # rep = es.indices.create(es_index, es_body)#, wait_for_active_shards=0) # except: # pass logging.info("Pushing %i documents to Elasticsearch..." % len(docs_to_index)) es_body = '' header = decoded_body['header'] for doc in docs_to_index: # try: # header['index']['_id'] = doc['_id'] #hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() # del doc['_id'] # except: # header['index']['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() header['index']['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc)) rep = es.bulk(body=es_body) #print(rep) t2 = time.time() if rep['errors'] == False: channel.basic_ack(delivery_tag = method.delivery_tag) #print("") logging.info("Done in %s seconds." % (t2-t1)) else: channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) #print("") logging.error(json.dumps(rep, indent=4)) logging.error("Failed") #time.sleep(5) return def main(cfg): from utils.close_connection import on_timeout es = Elasticsearch([cfg['indexer']['url']], timeout=60) es_logger = logging.getLogger('elasticsearch') es_logger.setLevel(logging.INFO) connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'])) timeout = 5 connection.add_timeout(timeout, on_timeout(connection)) channel = connection.channel() exchange = cfg['rabbitmq']['exchange'] # the queue this program will be consuming messages from docs_to_index_qn = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_2_suffix'] channel.basic_qos(prefetch_count=1) channel.basic_consume(lambda ch, method, properties, body: index_docs(ch, method, properties, body, es), queue=docs_to_index_qn)#, no_ack=True) channel.start_consuming() connection.close() if __name__ == '__main__': import yaml import time import signal signal.signal(signal.SIGINT, exit_gracefully) with open("config.yaml", 'r') as yamlfile: cfg = yaml.load(yamlfile) while True: try: main(cfg) except pika.exceptions.ChannelClosed: logging.info("Waiting for tasks...") time.sleep(5) except AuthorizationException as e: logging.error(e) exit(1)