From 061895f5578140fdf7e14da904ce1883dc7a9469 Mon Sep 17 00:00:00 2001 From: Alessandro Cerioni <acerioni@grandlyon.com> Date: Fri, 25 Jan 2019 19:18:00 +0100 Subject: [PATCH] A few improvements. - Introduced a template entry for the nested datatype (used, for the time being, for "responsibleParty"). - Enriched metadata with the projections per service and bounding boxes per projection. - Splitted 'protocol' into 'formats' and 'service'. - Added a timeout to the connections to RabbitMQ. --- 2-metadata-processor.py | 26 ++++++++++++++--------- 3-doc-enricher.py | 5 +++++ 4-docs-to-mongodb.py | 4 ++++ 6-doc-processor.py | 6 ++++++ 7-doc-indexer.py | 8 ++++++- es_template.py | 14 +++++++++++++ utils/fix_links.py | 46 +++++++++++++++++++++++++++++++++++++---- 7 files changed, 94 insertions(+), 15 deletions(-) diff --git a/2-metadata-processor.py b/2-metadata-processor.py index 062cfd8..c81b52f 100644 --- a/2-metadata-processor.py +++ b/2-metadata-processor.py @@ -10,6 +10,7 @@ from utils.exit_gracefully import exit_gracefully import re from utils.my_logging import logging from utils.fix_links import fix_links +from utils.enrich_links import enrich_links def list_to_dictlist( the_input, the_context=None ): @@ -121,7 +122,7 @@ def list_to_dictlist( the_input, the_context=None ): return the_output -def process_records( in_records, geonetwork_root_url ): +def process_records( in_records, geonetwork_root_url, working_directory ): #print( in_records[0].keys() ) @@ -145,7 +146,7 @@ def process_records( in_records, geonetwork_root_url ): #exit(1) del out_record['metadata-fr']['link'] tmp = list_to_dictlist(in_record['link'], 'link')#links - out_record['metadata-fr']['link'] = fix_links(tmp) + out_record['metadata-fr']['link'] = enrich_links( fix_links(tmp), working_directory ) del out_record['metadata-fr']['userinfo'] out_record['metadata-fr']['userinfo'] = list_to_dictlist(in_record['userinfo'])#links @@ -251,9 +252,8 @@ def process_page( channel, method, properties, body, **kwargs): page = decoded_body['body'] - out_records = process_records( page, geonetwork_root_url ) - # print(out_records[0]) - # time.sleep(1) + out_records = process_records( page, geonetwork_root_url, kwargs['working_directory'] ) + #print(json.dumps(out_records[0], indent=4)) #dispatch for metadata_record in out_records: @@ -269,7 +269,7 @@ def process_page( channel, method, properties, body, **kwargs): for link in metadata_record['metadata-fr']['link']: - if 'protocol' in link.keys() and link['protocol'] == 'OGC:WFS': + if 'service' in link.keys() and link['service'] == 'WFS': logging.debug('EUREKA : found a WFS ressource!') wfs_found = True #documents_to_index = [] @@ -306,12 +306,15 @@ def process_page( channel, method, properties, body, **kwargs): return - def main(cfg): - logging.debug(cfg) + from utils.close_connection import on_timeout + #logging.debug(cfg) + #global connection connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'])) + timeout = 5 + connection.add_timeout(timeout, on_timeout(connection)) channel = connection.channel() exchange = cfg['rabbitmq']['exchange'] # the queue this program will consume messages from: @@ -329,14 +332,17 @@ def main(cfg): channel.queue_declare(queue=docs_to_enrich_qn, durable=True) channel.queue_bind(exchange=exchange, queue=docs_to_enrich_qn, routing_key=docs_to_enrich_rk) - logging.info('Waiting for messages...') + working_directory = cfg['session']['working_directory'] + + #logging.info('Waiting for messages...') channel.basic_qos(prefetch_count=1) # channel.basic_consume(lambda ch, method, properties, body: process_page(ch, method, properties, body, out_channel, out_exchange, out_routing_key1), queue=queue_name)#, no_ack=True) channel.basic_consume(lambda ch, method, properties, body: process_page(ch, method, properties, body, exchange=exchange, docs_to_index_rk=docs_to_index_rk, - docs_to_enrich_rk=docs_to_enrich_rk), + docs_to_enrich_rk=docs_to_enrich_rk, + working_directory=working_directory), queue=metadata_pages_to_process_qn)#, no_ack=True) channel.start_consuming() diff --git a/3-doc-enricher.py b/3-doc-enricher.py index 6f6a396..affbc71 100644 --- a/3-doc-enricher.py +++ b/3-doc-enricher.py @@ -136,7 +136,12 @@ def enrich_docs( channel, method, properties, body, **kwargs ): def main(cfg): + from utils.close_connection import on_timeout + connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'])) + timeout = 5 + connection.add_timeout(timeout, on_timeout(connection)) + channel = connection.channel() exchange = cfg['rabbitmq']['exchange'] # the queue this program will consume messages from: diff --git a/4-docs-to-mongodb.py b/4-docs-to-mongodb.py index b2c757f..7a01961 100644 --- a/4-docs-to-mongodb.py +++ b/4-docs-to-mongodb.py @@ -41,7 +41,11 @@ def process_doc_pages( channel, method, properties, body, mongodb ): def main(cfg): + from utils.close_connection import on_timeout + connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'])) + timeout = 5 + connection.add_timeout(timeout, on_timeout(connection)) channel = connection.channel() exchange = cfg['rabbitmq']['exchange'] diff --git a/6-doc-processor.py b/6-doc-processor.py index 8b15659..2ce2c96 100644 --- a/6-doc-processor.py +++ b/6-doc-processor.py @@ -98,12 +98,18 @@ def process_docs( channel, method, properties, body, **kwargs ): def main(cfg): import os + + from utils.close_connection import on_timeout + filename = os.path.join( cfg['session']['working_directory'], cfg['session']['id'] + '_' + 'field_types.json' ) with open(filename, 'r') as fp: field_types = json.load(fp) connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'])) + timeout = 5 + connection.add_timeout(timeout, on_timeout(connection)) + channel = connection.channel() exchange = cfg['rabbitmq']['exchange'] diff --git a/7-doc-indexer.py b/7-doc-indexer.py index 9e49bca..6285613 100644 --- a/7-doc-indexer.py +++ b/7-doc-indexer.py @@ -36,7 +36,7 @@ def tag_doc( the_doc ): if 'link' in the_doc['metadata-fr'].keys(): for link in the_doc['metadata-fr']['link']: #print(link) - if any( [x in link['protocol'] for x in ['OGC:WFS', 'OGC:WMS', 'KML', 'WS']] ): + if 'service' in link.keys() and any( [x in link['service'] for x in ['WFS', 'WMS', 'KML', 'WS']] ): tag_dict['isQueryable'] = True break @@ -163,12 +163,18 @@ def index_docs(channel, method, properties, body, es): def main(cfg): + from utils.close_connection import on_timeout + es = Elasticsearch([cfg['indexer']['url']], timeout=60) es_logger = logging.getLogger('elasticsearch') es_logger.setLevel(logging.INFO) + connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'])) + timeout = 5 + connection.add_timeout(timeout, on_timeout(connection)) + channel = connection.channel() exchange = cfg['rabbitmq']['exchange'] diff --git a/es_template.py b/es_template.py index 617e943..fb5706f 100644 --- a/es_template.py +++ b/es_template.py @@ -137,6 +137,20 @@ template = { } } }, + { + "nested-template": { + "path_match": "metadata-fr.responsibleParty", + "mapping": { + "type": "nested" + # "fields": { + # "sort": + # { + # "type": "boolean" + # } + # } + } + } + }, { "unindexed-path-template": { "match_pattern": "regex", diff --git a/utils/fix_links.py b/utils/fix_links.py index 2906d65..8c05149 100644 --- a/utils/fix_links.py +++ b/utils/fix_links.py @@ -3,6 +3,8 @@ from pprint import pprint import json import requests from .my_logging import logging +#import logging + def translate_content_type( content_type ): @@ -19,6 +21,37 @@ def translate_content_type( content_type ): return output +def protocol_to_formats_and_services( links ): + + output = links.copy() + + for k, link in enumerate(links): + + if link['protocol'] == 'OGC:WMS': + output[k]['formats'] = ['PNG', 'JPEG'] + output[k]['service'] = 'WMS' + elif link['protocol'] == 'OGC:WFS': + output[k]['formats'] = ['GML', 'GeoJSON', 'ShapeFile'] + output[k]['service'] = 'WFS' + elif link['protocol'] == 'OGC:WCS': + output[k]['formats'] = ['TIFF'] + output[k]['service'] = 'WCS' + elif link['protocol'] == 'KML': + output[k]['formats'] = ['KML'] + output[k]['service'] = 'KML' + elif link['protocol'] == 'WS': + output[k]['formats'] = ['JSON', 'ShapeFile'] + output[k]['service'] = 'WS' + elif link['protocol'] == 'SOS': + output[k]['formats'] = ['JSON', 'XML'] + output[k]['service'] = 'SOS' + else: + output[k]['formats'] = [ link['protocol'] ] + + del output[k]['protocol'] + + return output + def fix_links( links ): @@ -62,8 +95,6 @@ def fix_links( links ): fixed_links[k]['protocol'] = known_format.upper() continue - #pprint(fixed_links) - # FIX TIF -> TIFF for k, link in enumerate(fixed_links): @@ -104,7 +135,7 @@ def fix_links( links ): fixed_links[k]['url'] = link['url'].split('?')[0] - return fixed_links + return protocol_to_formats_and_services(fixed_links) if __name__ == '__main__': @@ -190,9 +221,16 @@ if __name__ == '__main__': "protocol": "WWW-LINK", "content-type": "WWW:LINK", "unknown": "0" + }, + { + "name": "MNT2009_ombrage_10m_CC46", + "description": "Ombrage du relief du Grand Lyon 2009", + "url": "https://download.data.grandlyon.com/wcs/grandlyon", + "protocol": "OGC:WCS", + "content-type": "OGC:WCS", + "unknown": "1" } ] fixed_links = fix_links(links) - pprint(fixed_links) -- GitLab