Skip to content
Snippets Groups Projects
7-doc-indexer.py 6.96 KiB
Newer Older
  • Learn to ignore specific revisions
  • Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    import pika
    import msgpack
    import time
    import json
    from elasticsearch import Elasticsearch
    from elasticsearch.exceptions import AuthorizationException
    import hashlib
    from utils.exit_gracefully import exit_gracefully
    from utils.my_logging import logging
    
    from pprint import pprint
    
    def tag_doc( the_doc ):
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        # tag_list = ['isOpenAccess', 'isRealTime', 'isQueryable', 'isSearchable', 'isPunctual', 'isLinear', 'isAreal']
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        tag_dict = {}
    
        # initialisation
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        #for tag in tag_list:
        #   tag_dict[tag] = False
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        # isOpen?
        if not 'legalConstraints' in the_doc['metadata-fr'].keys() or not any( [x in the_doc['metadata-fr']['legalConstraints'] for x in ['Licence Associée', 'Licence Engagée'] ] ):
            tag_dict['isOpenAccess'] = True
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        else:
            tag_dict['isOpenAccess'] = False
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        # isRealTime?
        if 'updateFrequency' in the_doc['metadata-fr'].keys() and 'continual' in the_doc['metadata-fr']['updateFrequency']:
            tag_dict['isRealTime'] = True
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        else:
            tag_dict['isRealTime'] = False
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        # isQueryable?
    
        tag_dict['isQueryable'] = False # default
    
        if 'link' in the_doc['metadata-fr'].keys():
            for link in the_doc['metadata-fr']['link']:
                #print(link)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                if 'service' in link.keys() and any( [x in link['service'] for x in ['WFS', 'WMS', 'KML', 'WS']] ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        # N.B.: in order to determine the following tags, we need the data-fr field;
        # in case the data-fr field is absent, the tags 'isSearchable',
        # 'isPunctual', 'isLinear', 'isAreal' will be absent instead of being 'falsely' set to false!
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        # isSearchable?
    
        tag_dict['isSearchable'] = False # default
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        if 'data-fr' in the_doc.keys():
            tag_dict['isSearchable'] = True
    
    
            if 'geometry' in the_doc['data-fr'].keys():
    
                # init
                tag_dict['isPunctual'] = False
                tag_dict['isLinear']   = False
                tag_dict['isAreal']    = False
    
                # isPunctual?
                if any( [x in the_doc['data-fr']['geometry']['type'] for x in ['Point', 'MultiPoint']] ):
                    tag_dict['isPunctual'] = True
    
                # isLinear?
                if any( [x in the_doc['data-fr']['geometry']['type'] for x in ['LineString', 'MultiLineString']] ):
                    tag_dict['isLinear'] = True
    
                # isAreal?
                if any( [x in the_doc['data-fr']['geometry']['type'] for x in ['Polygon', 'MultiPolygon']] ):
                    tag_dict['isAreal'] = True
    
        # isSample? docs that are tagged by this script are never just a sample
        tag_dict['isSample'] = False
    
    
        tagged_doc = {'editorial-metadata': tag_dict, **the_doc}
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        return tagged_doc
    
    
    def index_docs(channel, method, properties, body, es):
    
        t1 = time.time()
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        decoded_body = msgpack.unpackb(body, raw=False)
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        if type(decoded_body['body']) is list:
            #docs_to_index = decoded_body['body']
            docs_to_index = [tag_doc(doc) for doc in decoded_body['body']]
    
        else:
            #docs_to_index = [decoded_body['body']]
            docs_to_index = [tag_doc(decoded_body['body'])]
    
    
        #print(docs_to_index)
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        # es_index = decoded_body['header']['index']['_index']
        # es_body = { "settings" : {
        #                 "number_of_shards" : 1,
        #                 "number_of_replicas" : 0,
        #                 "index.mapping.total_fields.limit": 10000,
        #                 "refresh_interval": "30s"
        #                 },
        #             "mappings": {
        #                 "_doc": {
        #                     "dynamic_templates": [ # priority is given by order!
        #                         {
        #                             "uuid" : {
        #                                 "path_match": "uuid",
        #                                 "mapping": {
        #                                     "type": "keyword",
        #                                     }
        #                                 }
        #                         },
        #                         {
        #                             "default" : {
        #                                 "path_match": "*",
        #                                 "mapping": {
        #                                     "enabled": "false"
        #                                 }
        #                             }
        #                         }
        #                     ]
        #                 }
        #             }
        #         }
        # #es_body.update({"mappings": {"_doc": {"dynamic_date_formats": ["strict_date_optional_time"]}}})
        #
        # try:
        #     rep = es.indices.create(es_index, es_body)#, wait_for_active_shards=0)
        # except:
        #     pass
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        logging.info("Pushing %i documents to Elasticsearch..." % len(docs_to_index))
    
        es_body = ''
        header = decoded_body['header']
        for doc in docs_to_index:
            # try:
            #     header['index']['_id'] = doc['_id'] #hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest()
            #     del doc['_id']
            # except:
            #     header['index']['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest()
    
            header['index']['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest()
    
            es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc))
    
    
        rep = es.bulk(body=es_body)
    
        #print(rep)
        t2 = time.time()
    
        if rep['errors'] == False:
            channel.basic_ack(delivery_tag = method.delivery_tag)
            #print("")
            logging.info("Done in %s seconds." % (t2-t1))
        else:
            channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
            #print("")
    
            logging.error(json.dumps(rep, indent=4))
            logging.error("Failed")
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
    
        #time.sleep(5)
    
        return
    
    
    def main(cfg):
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        from utils.close_connection import on_timeout
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        es = Elasticsearch([cfg['indexer']['url']], timeout=60)
    
    
        es_logger = logging.getLogger('elasticsearch')
        es_logger.setLevel(logging.INFO)
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host']))
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        timeout = 5
        connection.add_timeout(timeout, on_timeout(connection))
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        channel = connection.channel()
        exchange    = cfg['rabbitmq']['exchange']
    
    
        # the queue this program will be consuming messages from
        docs_to_index_qn = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_2_suffix']
    
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(lambda ch, method, properties, body: index_docs(ch, method, properties, body, es), queue=docs_to_index_qn)#, no_ack=True)
        channel.start_consuming()
    
        connection.close()
    
    
    if __name__ == '__main__':
    
        import yaml
        import time
        import signal
    
        signal.signal(signal.SIGINT, exit_gracefully)
    
        with open("config.yaml", 'r') as yamlfile:
            cfg = yaml.load(yamlfile)
    
        while True:
    
            try:
                main(cfg)
            except pika.exceptions.ChannelClosed:
                logging.info("Waiting for tasks...")
                time.sleep(5)
            except AuthorizationException as e:
                logging.error(e)
                exit(1)