import pika import msgpack from collections import OrderedDict import urllib.parse import time from dateutil.parser import parse import hashlib import json from utils.exit_gracefully import exit_gracefully import re from utils.my_logging import logging from utils.fix_links import fix_links from utils.enrich_links import enrich_links def list_to_dictlist( the_input, the_context=None ): """ This function transforms some of the lists (responsibleParty, link, ...) returned by GeoNetwork's q API into a list of dicts. """ dictionary = dict() dictionary['responsibleParty'] = { 0: "role", 1: "appliesTo", 2: "organisationName", 3: "logo", 4: "email", 5: "individualName", 6: "positionName", 7: "address", 8: "telephone" } dictionary['link'] = { 0: "name", 1: "description", 2: "url", 3: "protocol", 4: "content-type", 5: "unknown" } dictionary['image'] = {0: "type", 1: "url"} dictionary['inspirethemewithac'] = {0: "id", 1: "label"} dictionary['geoBox'] = {0: "westBoundLongitude", 1: "southBoundLatitude", 2: "eastBoundLongitude", 3: "northBoundLatitude"} # in case the input is not already a list, we convert it to a list if type( the_input ) is str: the_input = [the_input] the_output = [] for in_item in the_input: #print(i, link) in_item_split = in_item.split('|') out_item = {} #print(in_item_split) for k, line in enumerate(in_item_split): if line != "": # print(the_context, dictionary.keys()) # # out_item[ dictionary[the_context][k] ] = line if the_context != None: # logging.debug('!'*80) # logging.debug( dictionary[the_context][k] ) # logging.debug('x') out_item[ dictionary[the_context][k] ] = line else: # logging.debug('?'*80) out_item[ k ] = line else: out_item[ dictionary[the_context][k] ] = 'null' logging.debug(the_context) logging.debug(out_item) # appending a hash value of the item can be useful at client-side, # as it allows one to identify entities that are common to multiple datasets... if the_context == 'responsibleParty': tmp = out_item.copy() del tmp['appliesTo'] # as a matter of facts, 'appliesTo' isn't really a property of the responsibleParty md5 = hashlib.md5( json.dumps(tmp, sort_keys=True).encode("utf-8") ).hexdigest() else: md5 = hashlib.md5( json.dumps(out_item, sort_keys=True).encode("utf-8") ).hexdigest() # In the case of dataset produced by the Métropole de Lyon, # the "organisationName" contains "Métropole de Lyon" # followed by the name of the direction. # In the following, we arrange things differently... if the_context == 'responsibleParty': try: parent_organisation, child_organisation = out_item['organisationName'].split('/') parent_organisation = parent_organisation.strip() child_organisation = child_organisation.strip() except: parent_organisation, child_organisation = out_item['organisationName'], None parent_organisation = parent_organisation.strip() child_organisation = None out_item['organisationName'] = parent_organisation if child_organisation != None: if 'individualName' in out_item.keys(): out_item['individualName'] = child_organisation + ' / ' + out_item['individualName'] else: out_item['individualName'] = child_organisation the_output.append( {'md5': md5, **out_item}) if the_context == 'geoBox': polygon = [] polygon.append([float(the_output[0]['westBoundLongitude']), float(the_output[0]['southBoundLatitude'])]) polygon.append([float(the_output[0]['eastBoundLongitude']), float(the_output[0]['southBoundLatitude'])]) polygon.append([float(the_output[0]['eastBoundLongitude']), float(the_output[0]['northBoundLatitude'])]) polygon.append([float(the_output[0]['westBoundLongitude']), float(the_output[0]['northBoundLatitude'])]) polygon.append([float(the_output[0]['westBoundLongitude']), float(the_output[0]['southBoundLatitude'])]) the_output = {'type': 'Polygon', 'coordinates': [polygon]} return the_output def process_records( in_records, geonetwork_root_url, working_directory ): #print( in_records[0].keys() ) out_records = [] for in_record in in_records: out_record = {} # all the content of the original record in "mounted" at "metadata-fr" out_record['metadata-fr'] = in_record.copy() if '_locale' in out_record['metadata-fr'].keys(): del out_record['metadata-fr']['_locale'] out_record['metadata-fr']['locale'] = in_record['_locale'] # we transform some lists into list of dictionaries... if 'responsibleParty' in out_record['metadata-fr'].keys(): del out_record['metadata-fr']['responsibleParty'] out_record['metadata-fr']['responsibleParty'] = list_to_dictlist(in_record['responsibleParty'], 'responsibleParty') if 'link' in out_record['metadata-fr'].keys(): #logging.debug(in_record['link']) #exit(1) del out_record['metadata-fr']['link'] tmp = list_to_dictlist(in_record['link'], 'link')#links out_record['metadata-fr']['link'] = enrich_links( fix_links(tmp), working_directory ) if 'userinfo' in out_record['metadata-fr'].keys(): del out_record['metadata-fr']['userinfo'] out_record['metadata-fr']['userinfo'] = list_to_dictlist(in_record['userinfo'])#links if 'category' in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['category']) is str: del out_record['metadata-fr']['category'] out_record['metadata-fr']['category'] = [in_record['category']] if 'topicCat'in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['topicCat']) is str: del out_record['metadata-fr']['topicCat'] out_record['metadata-fr']['topicCat'] = [in_record['topicCat']] if 'keyword' in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['keyword']) is str: del out_record['metadata-fr']['keyword'] out_record['metadata-fr']['keyword'] = [in_record['keyword']] if 'legalConstraints' in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['legalConstraints']) is str: del out_record['metadata-fr']['legalConstraints'] out_record['metadata-fr']['legalConstraints'] = [in_record['legalConstraints']] if 'resourceConstraints' in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['resourceConstraints']) is str: del out_record['metadata-fr']['resourceConstraints'] out_record['metadata-fr']['resourceConstraints'] = [in_record['resourceConstraints']] if 'geoBox' in out_record['metadata-fr'].keys(): del out_record['metadata-fr']['geoBox'] out_record['metadata-fr']['bbox'] = list_to_dictlist(in_record['geoBox'], 'geoBox') else: pass # it means that we are treating a non-geographic dataset if 'inspirethemewithac' in out_record['metadata-fr']: del out_record['metadata-fr']['inspirethemewithac'] out_record['metadata-fr']['inspirethemewithac'] = list_to_dictlist(in_record['inspirethemewithac'], 'inspirethemewithac') else: pass # it means that we are treating a non-geographic dataset if 'image' in out_record['metadata-fr'].keys(): del out_record['metadata-fr']['image'] out_record['metadata-fr']['image'] = list_to_dictlist(in_record['image'], 'image') else: pass # it means that we are treating a non-geographic dataset properties_to_convert_to_date = [] properties_to_convert_to_date.append('creationDate') for prop in properties_to_convert_to_date: if prop in out_record['metadata-fr'].keys(): try: out_record['metadata-fr'][prop] = str(parse(in_record[prop])) except: #TODO logging.error('Unable to parse date in metadata: %s' % in_record[prop]) del out_record['metadata-fr'][prop] # let's delete some attributes which are very specific to GeoNetwork attribs_to_delete = ['userinfo', 'isHarvested', 'isTemplate', 'owner', \ 'displayOrder', 'publishedForGroup', 'valid', 'docLocale', \ 'popularity', 'mdLanguage', 'root', 'rating', 'source', \ 'defaultTitle', 'datasetLang', 'geoDesc', 'locale', 'logo'] for attrib in attribs_to_delete: try: del out_record['metadata-fr'][attrib] except: pass if 'idxMsg' in in_record.keys(): del out_record['metadata-fr']['idxMsg'] # let's delete duplicates in the 'updateFrequency' attribute if 'updateFrequency' in in_record.keys() and type(in_record['updateFrequency']) is list: out_record['metadata-fr']['updateFrequency'] = list(set(in_record['updateFrequency'])) # let's generate the href of this document local_params = OrderedDict() local_params['_content_type'] = 'json' local_params['resultType'] = 'details' local_params['fast'] = 'index' local_params['buildSummary'] = 'false' local_params['uuid'] = out_record['metadata-fr']['geonet:info']['uuid'] out_record['metadata-fr']['href'] = geonetwork_root_url + "?" + urllib.parse.urlencode(local_params) #pprint(out_record) out_records.append(out_record) #print('-'*80) return out_records # def process_page( channel, method, properties, body, out_channel, out_exchange, out_routing_key ): def process_page( channel, method, properties, body, **kwargs): decoded_body = msgpack.unpackb(body, raw=False) geonetwork_root_url = decoded_body['header']['geonetwork_root_url'] session_id = decoded_body['header']['session_id'] dest_index = decoded_body['header']['dest_index'] page = decoded_body['body'] out_records = process_records( page, geonetwork_root_url, kwargs['working_directory'] ) #print(json.dumps(out_records[0], indent=4)) #dispatch for metadata_record in out_records: the_uuid = metadata_record['metadata-fr']['geonet:info']['uuid'] logging.info("Processing record %s..." % the_uuid) # let's look for a WFS ressource to potentially fetch and index... wfs_found = False if 'link' in metadata_record['metadata-fr'].keys(): for link in metadata_record['metadata-fr']['link']: #TODO: generalize! if 'service' in link.keys() and link['service'] == 'WFS' and 'data.grandlyon.com' in link['url']: logging.debug('EUREKA : found a WFS ressource!') wfs_found = True #documents_to_index = [] # OLD featurePages = getWFS(metadata_record['metadata-fr']['link'][key]) #featurePages = getWFS(link) # full_version = metadata_record.copy() # including metadata AND data full_version['uuid'] = metadata_record['metadata-fr']['geonet:info']['uuid'] + '.full' full_version['type'] = metadata_record['metadata-fr']['type'] msg = {'header': {'wfs_info': link, 'offset': 0, 'session_id': session_id, 'dest_index': dest_index}, 'body': full_version} the_body = msgpack.packb(msg, use_bin_type=True) channel.basic_publish( exchange=kwargs['exchange'], routing_key=kwargs['docs_to_enrich_rk'], body=the_body, properties=pika.BasicProperties(delivery_mode = 2) ) break if not wfs_found: meta_version = metadata_record.copy() # including metadata ONLY meta_version['uuid'] = metadata_record['metadata-fr']['geonet:info']['uuid'] + '.meta' meta_version['type'] = metadata_record['metadata-fr']['type'] msg = {'header': { "index" : { "_index" : dest_index, "_type" : "_doc" } }, 'body': meta_version} the_body = msgpack.packb(msg, use_bin_type=True) channel.basic_publish( exchange=kwargs['exchange'], routing_key=kwargs['docs_to_index_rk'], body=the_body, properties=pika.BasicProperties(delivery_mode = 2) ) channel.basic_ack(delivery_tag = method.delivery_tag) #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) return def main(cfg): from utils.close_connection import on_timeout #logging.debug(cfg) #global connection connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'])) timeout = 5 connection.add_timeout(timeout, on_timeout(connection)) channel = connection.channel() exchange = cfg['rabbitmq']['exchange'] # the queue this program will consume messages from: metadata_pages_to_process_qn = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_1_suffix'] docs_to_index_rk = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_2_suffix'] docs_to_index_qn = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_2_suffix'] docs_to_enrich_rk = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_3_suffix'] docs_to_enrich_qn = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_3_suffix'] channel.exchange_declare(exchange=cfg['rabbitmq']['exchange'], exchange_type='direct') channel.queue_declare(queue=docs_to_index_qn, durable=True) channel.queue_bind(exchange=exchange, queue=docs_to_index_qn, routing_key=docs_to_index_rk) channel.queue_declare(queue=docs_to_enrich_qn, durable=True) channel.queue_bind(exchange=exchange, queue=docs_to_enrich_qn, routing_key=docs_to_enrich_rk) working_directory = cfg['session']['working_directory'] #logging.info('Waiting for messages...') channel.basic_qos(prefetch_count=1) # channel.basic_consume(lambda ch, method, properties, body: process_page(ch, method, properties, body, out_channel, out_exchange, out_routing_key1), queue=queue_name)#, no_ack=True) channel.basic_consume(lambda ch, method, properties, body: process_page(ch, method, properties, body, exchange=exchange, docs_to_index_rk=docs_to_index_rk, docs_to_enrich_rk=docs_to_enrich_rk, working_directory=working_directory), queue=metadata_pages_to_process_qn)#, no_ack=True) channel.start_consuming() connection.close() if __name__ == '__main__': import yaml import time import signal signal.signal(signal.SIGINT, exit_gracefully) with open("config.yaml", 'r') as yamlfile: cfg = yaml.load(yamlfile) logging.info('Starting...') while True: try: main(cfg) except pika.exceptions.ChannelClosed: logging.info('Waiting for tasks...') time.sleep(5) except Exception as e: logging.error(e) exit(1)