Skip to content
Snippets Groups Projects
2-metadata-processor.py 16.2 KiB
Newer Older
  • Learn to ignore specific revisions
  • Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    import pika
    import msgpack
    from collections import OrderedDict
    import urllib.parse
    import time
    from dateutil.parser import parse
    import hashlib
    import json
    from utils.exit_gracefully import exit_gracefully
    import re
    from utils.my_logging import logging
    from utils.fix_links import fix_links
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    from utils.enrich_links import enrich_links
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
    
    def list_to_dictlist( the_input, the_context=None ):
        """
        This function transforms some of the lists (responsibleParty, link, ...) returned by GeoNetwork's q API into a list of dicts.
        """
    
        dictionary = dict()
    
        dictionary['responsibleParty'] = {  0: "role",
                                            1: "appliesTo",
                                            2: "organisationName",
                                            3: "logo",
                                            4: "email",
                                            5: "individualName",
                                            6: "positionName",
                                            7: "address",
                                            8: "telephone" }
    
    
        dictionary['link'] = {  0: "name",
                                1: "description",
                                2: "url",
                                3: "protocol",
                                4: "content-type",
                                5: "unknown" }
    
        dictionary['image'] = {0: "type", 1: "url"}
    
        dictionary['inspirethemewithac'] = {0: "id", 1: "label"}
    
        dictionary['geoBox'] = {0: "westBoundLongitude", 1: "southBoundLatitude", 2: "eastBoundLongitude", 3: "northBoundLatitude"}
    
        # in case the input is not already a list, we convert it to a list
        if type( the_input ) is str:
            the_input = [the_input]
    
        the_output = []
        for in_item in the_input:
            #print(i, link)
            in_item_split = in_item.split('|')
    
            out_item = {}
            #print(in_item_split)
            for k, line in enumerate(in_item_split):
                if line != "":
                    # print(the_context, dictionary.keys())
                    # # out_item[ dictionary[the_context][k] ] = line
                    if the_context != None:
                        # logging.debug('!'*80)
                        # logging.debug( dictionary[the_context][k] )
                        # logging.debug('x')
                        out_item[ dictionary[the_context][k] ] = line
                    else:
                        # logging.debug('?'*80)
                        out_item[ k ] = line
                else:
                    out_item[ dictionary[the_context][k] ] = 'null'
    
            logging.debug(the_context)
            logging.debug(out_item)
    
            # appending a hash value of the item can be useful at client-side,
            # as it allows one to identify entities that are common to multiple datasets...
            if the_context == 'responsibleParty':
                tmp = out_item.copy()
                del tmp['appliesTo'] # as a matter of facts, 'appliesTo' isn't really a property of the responsibleParty
                md5 = hashlib.md5( json.dumps(tmp, sort_keys=True).encode("utf-8") ).hexdigest()
            else:
                md5 = hashlib.md5( json.dumps(out_item, sort_keys=True).encode("utf-8") ).hexdigest()
    
            # In the case of dataset produced by the Métropole de Lyon,
            # the "organisationName" contains "Métropole de Lyon"
            # followed by the name of the direction.
            # In the following, we arrange things differently...
            if the_context == 'responsibleParty':
    
                try:
                    parent_organisation, child_organisation = out_item['organisationName'].split('/')
                    parent_organisation = parent_organisation.strip()
                    child_organisation = child_organisation.strip()
                except:
                    parent_organisation, child_organisation = out_item['organisationName'], None
                    parent_organisation = parent_organisation.strip()
                    child_organisation = None
    
                out_item['organisationName'] = parent_organisation
    
                if child_organisation != None:
                    if 'individualName' in out_item.keys():
                        out_item['individualName'] = child_organisation + ' / ' + out_item['individualName']
                    else:
                        out_item['individualName'] = child_organisation
    
    
            the_output.append( {'md5': md5, **out_item})
    
        if the_context == 'geoBox':
    
            polygon = []
    
            polygon.append([float(the_output[0]['westBoundLongitude']), float(the_output[0]['southBoundLatitude'])])
            polygon.append([float(the_output[0]['eastBoundLongitude']), float(the_output[0]['southBoundLatitude'])])
            polygon.append([float(the_output[0]['eastBoundLongitude']), float(the_output[0]['northBoundLatitude'])])
            polygon.append([float(the_output[0]['westBoundLongitude']), float(the_output[0]['northBoundLatitude'])])
            polygon.append([float(the_output[0]['westBoundLongitude']), float(the_output[0]['southBoundLatitude'])])
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
            the_output = {'type': 'Polygon', 'coordinates': [polygon]}
    
        return the_output
    
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    def process_records( in_records, geonetwork_root_url, working_directory ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        #print( in_records[0].keys() )
    
        out_records = []
    
        for in_record in in_records:
    
            out_record = {}
            # all the content of the original record in "mounted" at "metadata-fr"
            out_record['metadata-fr'] = in_record.copy()
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            if '_locale' in out_record['metadata-fr'].keys():
                del out_record['metadata-fr']['_locale']
                out_record['metadata-fr']['locale'] = in_record['_locale']
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
            # we transform some lists into list of dictionaries...
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            if 'responsibleParty' in out_record['metadata-fr'].keys():
                del out_record['metadata-fr']['responsibleParty']
                out_record['metadata-fr']['responsibleParty'] = list_to_dictlist(in_record['responsibleParty'], 'responsibleParty')
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
            if 'link' in out_record['metadata-fr'].keys():
                #logging.debug(in_record['link'])
                #exit(1)
                del out_record['metadata-fr']['link']
                tmp = list_to_dictlist(in_record['link'], 'link')#links
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                out_record['metadata-fr']['link'] = enrich_links( fix_links(tmp), working_directory )
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            if 'userinfo' in out_record['metadata-fr'].keys():
                del out_record['metadata-fr']['userinfo']
                out_record['metadata-fr']['userinfo'] = list_to_dictlist(in_record['userinfo'])#links
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
            if 'category' in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['category']) is str:
                del out_record['metadata-fr']['category']
                out_record['metadata-fr']['category'] = [in_record['category']]
    
            if 'topicCat'in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['topicCat']) is str:
                del out_record['metadata-fr']['topicCat']
                out_record['metadata-fr']['topicCat'] = [in_record['topicCat']]
    
            if 'keyword' in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['keyword']) is str:
                del out_record['metadata-fr']['keyword']
                out_record['metadata-fr']['keyword'] = [in_record['keyword']]
    
            if 'legalConstraints' in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['legalConstraints']) is str:
                del out_record['metadata-fr']['legalConstraints']
                out_record['metadata-fr']['legalConstraints'] = [in_record['legalConstraints']]
    
            if 'resourceConstraints' in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['resourceConstraints']) is str:
                del out_record['metadata-fr']['resourceConstraints']
                out_record['metadata-fr']['resourceConstraints'] = [in_record['resourceConstraints']]
    
            if 'geoBox' in out_record['metadata-fr'].keys():
                del out_record['metadata-fr']['geoBox']
                out_record['metadata-fr']['bbox'] = list_to_dictlist(in_record['geoBox'], 'geoBox')
            else:
                pass # it means that we are treating a non-geographic dataset
    
            if 'inspirethemewithac' in out_record['metadata-fr']:
                del out_record['metadata-fr']['inspirethemewithac']
                out_record['metadata-fr']['inspirethemewithac'] = list_to_dictlist(in_record['inspirethemewithac'], 'inspirethemewithac')
            else:
                pass # it means that we are treating a non-geographic dataset
    
            if 'image' in out_record['metadata-fr'].keys():
                del out_record['metadata-fr']['image']
                out_record['metadata-fr']['image'] = list_to_dictlist(in_record['image'], 'image')
            else:
                pass # it means that we are treating a non-geographic dataset
    
    
    
            properties_to_convert_to_date = []
            properties_to_convert_to_date.append('creationDate')
    
            for prop in properties_to_convert_to_date:
    
                if prop in out_record['metadata-fr'].keys():
                    try:
                        out_record['metadata-fr'][prop] = str(parse(in_record[prop]))
                    except:
                        #TODO logging.error('Unable to parse date in metadata: %s' % in_record[prop])
                        del out_record['metadata-fr'][prop]
    
    
            # let's delete some attributes which are very specific to GeoNetwork
    
            attribs_to_delete = ['userinfo', 'isHarvested', 'isTemplate', 'owner', \
                                    'displayOrder', 'publishedForGroup', 'valid', 'docLocale', \
                                    'popularity', 'mdLanguage', 'root', 'rating', 'source', \
                                    'defaultTitle', 'datasetLang', 'geoDesc', 'locale', 'logo']
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
            for attrib in attribs_to_delete:
                try:
                    del out_record['metadata-fr'][attrib]
                except:
                    pass
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
            if 'idxMsg' in in_record.keys():
                del out_record['metadata-fr']['idxMsg']
    
            # let's delete duplicates in the 'updateFrequency' attribute
            if 'updateFrequency' in in_record.keys() and type(in_record['updateFrequency']) is list:
                out_record['metadata-fr']['updateFrequency'] = list(set(in_record['updateFrequency']))
    
            # let's generate the href of this document
            local_params = OrderedDict()
            local_params['_content_type'] = 'json'
            local_params['resultType'] = 'details'
            local_params['fast'] = 'index'
            local_params['buildSummary'] = 'false'
            local_params['uuid'] = out_record['metadata-fr']['geonet:info']['uuid']
    
            out_record['metadata-fr']['href'] = geonetwork_root_url + "?" + urllib.parse.urlencode(local_params)
    
    
            #pprint(out_record)
            out_records.append(out_record)
            #print('-'*80)
    
        return out_records
    
    
    
    
    # def process_page( channel, method, properties, body, out_channel, out_exchange, out_routing_key ):
    def process_page( channel, method, properties, body, **kwargs):
    
    
        decoded_body = msgpack.unpackb(body, raw=False)
    
        geonetwork_root_url = decoded_body['header']['geonetwork_root_url']
        session_id          = decoded_body['header']['session_id']
        dest_index          = decoded_body['header']['dest_index']
    
        page = decoded_body['body']
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        out_records = process_records( page, geonetwork_root_url, kwargs['working_directory'] )
        #print(json.dumps(out_records[0], indent=4))
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        #dispatch
        for metadata_record in out_records:
            the_uuid = metadata_record['metadata-fr']['geonet:info']['uuid']
            logging.info("Processing record %s..." % the_uuid)
    
    
    
            # let's look for a WFS ressource to potentially fetch and index...
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            wfs_found = False
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            if 'link' in metadata_record['metadata-fr'].keys():
    
                for link in metadata_record['metadata-fr']['link']:
    
                    #TODO: generalize!
                    if 'service' in link.keys() and link['service'] == 'WFS' and 'data.grandlyon.com' in link['url']:
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                        logging.debug('EUREKA : found a WFS ressource!')
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                        wfs_found = True
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                        #documents_to_index = []
                        # OLD featurePages = getWFS(metadata_record['metadata-fr']['link'][key])
                        #featurePages = getWFS(link) #
                        full_version = metadata_record.copy() # including metadata AND data
                        full_version['uuid'] = metadata_record['metadata-fr']['geonet:info']['uuid'] + '.full'
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                        full_version['type'] = metadata_record['metadata-fr']['type']
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
                        msg = {'header': {'wfs_info': link, 'offset': 0, 'session_id': session_id, 'dest_index': dest_index}, 'body': full_version}
                        the_body = msgpack.packb(msg, use_bin_type=True)
    
                        channel.basic_publish( exchange=kwargs['exchange'],
                                               routing_key=kwargs['docs_to_enrich_rk'],
                                               body=the_body,
                                               properties=pika.BasicProperties(delivery_mode = 2) )
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                        break
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
            if not wfs_found:
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                meta_version = metadata_record.copy() # including metadata ONLY
                meta_version['uuid'] = metadata_record['metadata-fr']['geonet:info']['uuid'] + '.meta'
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                meta_version['type'] = metadata_record['metadata-fr']['type']
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                msg = {'header': { "index" : { "_index" : dest_index, "_type" : "_doc" } }, 'body': meta_version}
                the_body = msgpack.packb(msg, use_bin_type=True)
    
                channel.basic_publish( exchange=kwargs['exchange'],
                                       routing_key=kwargs['docs_to_index_rk'],
                                       body=the_body,
                                       properties=pika.BasicProperties(delivery_mode = 2) )
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        channel.basic_ack(delivery_tag = method.delivery_tag)
        #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
        return
    
    
    def main(cfg):
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        from utils.close_connection import on_timeout
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        #logging.debug(cfg)
        #global connection
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host']))
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        timeout = 5
        connection.add_timeout(timeout, on_timeout(connection))
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        channel = connection.channel()
        exchange    = cfg['rabbitmq']['exchange']
        # the queue this program will consume messages from:
        metadata_pages_to_process_qn  = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_1_suffix']
    
        docs_to_index_rk = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_2_suffix']
        docs_to_index_qn = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_2_suffix']
    
        docs_to_enrich_rk = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_3_suffix']
        docs_to_enrich_qn = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_3_suffix']
    
    
        channel.exchange_declare(exchange=cfg['rabbitmq']['exchange'], exchange_type='direct')
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        channel.queue_declare(queue=docs_to_index_qn, durable=True)
        channel.queue_bind(exchange=exchange, queue=docs_to_index_qn, routing_key=docs_to_index_rk)
        channel.queue_declare(queue=docs_to_enrich_qn, durable=True)
        channel.queue_bind(exchange=exchange, queue=docs_to_enrich_qn, routing_key=docs_to_enrich_rk)
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        working_directory =  cfg['session']['working_directory']
    
        #logging.info('Waiting for messages...')
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        channel.basic_qos(prefetch_count=1)
        # channel.basic_consume(lambda ch, method, properties, body: process_page(ch, method, properties, body, out_channel, out_exchange, out_routing_key1), queue=queue_name)#, no_ack=True)
        channel.basic_consume(lambda ch, method, properties, body: process_page(ch, method, properties, body,
                                                                    exchange=exchange,
                                                                    docs_to_index_rk=docs_to_index_rk,
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                                                                    docs_to_enrich_rk=docs_to_enrich_rk,
                                                                    working_directory=working_directory),
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                              queue=metadata_pages_to_process_qn)#, no_ack=True)
    
        channel.start_consuming()
    
        connection.close()
    
    
    if __name__ == '__main__':
    
        import yaml
        import time
        import signal
        signal.signal(signal.SIGINT, exit_gracefully)
    
        with open("config.yaml", 'r') as yamlfile:
            cfg = yaml.load(yamlfile)
    
        logging.info('Starting...')
    
        while True:
            try:
                main(cfg)
            except pika.exceptions.ChannelClosed:
                logging.info('Waiting for tasks...')
                time.sleep(5)
            except Exception as e:
                logging.error(e)
                exit(1)