Skip to content
Snippets Groups Projects
Commit 061895f5 authored by Alessandro Cerioni's avatar Alessandro Cerioni
Browse files

A few improvements.

- Introduced a template entry for the nested datatype (used, for the time being, for "responsibleParty").
- Enriched metadata with the projections per service and bounding boxes per projection.
- Splitted 'protocol' into 'formats' and 'service'.
- Added a timeout to the connections to RabbitMQ.
parent eb0262f7
No related branches found
No related tags found
No related merge requests found
...@@ -10,6 +10,7 @@ from utils.exit_gracefully import exit_gracefully ...@@ -10,6 +10,7 @@ from utils.exit_gracefully import exit_gracefully
import re import re
from utils.my_logging import logging from utils.my_logging import logging
from utils.fix_links import fix_links from utils.fix_links import fix_links
from utils.enrich_links import enrich_links
def list_to_dictlist( the_input, the_context=None ): def list_to_dictlist( the_input, the_context=None ):
...@@ -121,7 +122,7 @@ def list_to_dictlist( the_input, the_context=None ): ...@@ -121,7 +122,7 @@ def list_to_dictlist( the_input, the_context=None ):
return the_output return the_output
def process_records( in_records, geonetwork_root_url ): def process_records( in_records, geonetwork_root_url, working_directory ):
#print( in_records[0].keys() ) #print( in_records[0].keys() )
...@@ -145,7 +146,7 @@ def process_records( in_records, geonetwork_root_url ): ...@@ -145,7 +146,7 @@ def process_records( in_records, geonetwork_root_url ):
#exit(1) #exit(1)
del out_record['metadata-fr']['link'] del out_record['metadata-fr']['link']
tmp = list_to_dictlist(in_record['link'], 'link')#links tmp = list_to_dictlist(in_record['link'], 'link')#links
out_record['metadata-fr']['link'] = fix_links(tmp) out_record['metadata-fr']['link'] = enrich_links( fix_links(tmp), working_directory )
del out_record['metadata-fr']['userinfo'] del out_record['metadata-fr']['userinfo']
out_record['metadata-fr']['userinfo'] = list_to_dictlist(in_record['userinfo'])#links out_record['metadata-fr']['userinfo'] = list_to_dictlist(in_record['userinfo'])#links
...@@ -251,9 +252,8 @@ def process_page( channel, method, properties, body, **kwargs): ...@@ -251,9 +252,8 @@ def process_page( channel, method, properties, body, **kwargs):
page = decoded_body['body'] page = decoded_body['body']
out_records = process_records( page, geonetwork_root_url ) out_records = process_records( page, geonetwork_root_url, kwargs['working_directory'] )
# print(out_records[0]) #print(json.dumps(out_records[0], indent=4))
# time.sleep(1)
#dispatch #dispatch
for metadata_record in out_records: for metadata_record in out_records:
...@@ -269,7 +269,7 @@ def process_page( channel, method, properties, body, **kwargs): ...@@ -269,7 +269,7 @@ def process_page( channel, method, properties, body, **kwargs):
for link in metadata_record['metadata-fr']['link']: for link in metadata_record['metadata-fr']['link']:
if 'protocol' in link.keys() and link['protocol'] == 'OGC:WFS': if 'service' in link.keys() and link['service'] == 'WFS':
logging.debug('EUREKA : found a WFS ressource!') logging.debug('EUREKA : found a WFS ressource!')
wfs_found = True wfs_found = True
#documents_to_index = [] #documents_to_index = []
...@@ -306,12 +306,15 @@ def process_page( channel, method, properties, body, **kwargs): ...@@ -306,12 +306,15 @@ def process_page( channel, method, properties, body, **kwargs):
return return
def main(cfg): def main(cfg):
logging.debug(cfg) from utils.close_connection import on_timeout
#logging.debug(cfg)
#global connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'])) connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host']))
timeout = 5
connection.add_timeout(timeout, on_timeout(connection))
channel = connection.channel() channel = connection.channel()
exchange = cfg['rabbitmq']['exchange'] exchange = cfg['rabbitmq']['exchange']
# the queue this program will consume messages from: # the queue this program will consume messages from:
...@@ -329,14 +332,17 @@ def main(cfg): ...@@ -329,14 +332,17 @@ def main(cfg):
channel.queue_declare(queue=docs_to_enrich_qn, durable=True) channel.queue_declare(queue=docs_to_enrich_qn, durable=True)
channel.queue_bind(exchange=exchange, queue=docs_to_enrich_qn, routing_key=docs_to_enrich_rk) channel.queue_bind(exchange=exchange, queue=docs_to_enrich_qn, routing_key=docs_to_enrich_rk)
logging.info('Waiting for messages...') working_directory = cfg['session']['working_directory']
#logging.info('Waiting for messages...')
channel.basic_qos(prefetch_count=1) channel.basic_qos(prefetch_count=1)
# channel.basic_consume(lambda ch, method, properties, body: process_page(ch, method, properties, body, out_channel, out_exchange, out_routing_key1), queue=queue_name)#, no_ack=True) # channel.basic_consume(lambda ch, method, properties, body: process_page(ch, method, properties, body, out_channel, out_exchange, out_routing_key1), queue=queue_name)#, no_ack=True)
channel.basic_consume(lambda ch, method, properties, body: process_page(ch, method, properties, body, channel.basic_consume(lambda ch, method, properties, body: process_page(ch, method, properties, body,
exchange=exchange, exchange=exchange,
docs_to_index_rk=docs_to_index_rk, docs_to_index_rk=docs_to_index_rk,
docs_to_enrich_rk=docs_to_enrich_rk), docs_to_enrich_rk=docs_to_enrich_rk,
working_directory=working_directory),
queue=metadata_pages_to_process_qn)#, no_ack=True) queue=metadata_pages_to_process_qn)#, no_ack=True)
channel.start_consuming() channel.start_consuming()
......
...@@ -136,7 +136,12 @@ def enrich_docs( channel, method, properties, body, **kwargs ): ...@@ -136,7 +136,12 @@ def enrich_docs( channel, method, properties, body, **kwargs ):
def main(cfg): def main(cfg):
from utils.close_connection import on_timeout
connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'])) connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host']))
timeout = 5
connection.add_timeout(timeout, on_timeout(connection))
channel = connection.channel() channel = connection.channel()
exchange = cfg['rabbitmq']['exchange'] exchange = cfg['rabbitmq']['exchange']
# the queue this program will consume messages from: # the queue this program will consume messages from:
......
...@@ -41,7 +41,11 @@ def process_doc_pages( channel, method, properties, body, mongodb ): ...@@ -41,7 +41,11 @@ def process_doc_pages( channel, method, properties, body, mongodb ):
def main(cfg): def main(cfg):
from utils.close_connection import on_timeout
connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'])) connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host']))
timeout = 5
connection.add_timeout(timeout, on_timeout(connection))
channel = connection.channel() channel = connection.channel()
exchange = cfg['rabbitmq']['exchange'] exchange = cfg['rabbitmq']['exchange']
......
...@@ -98,12 +98,18 @@ def process_docs( channel, method, properties, body, **kwargs ): ...@@ -98,12 +98,18 @@ def process_docs( channel, method, properties, body, **kwargs ):
def main(cfg): def main(cfg):
import os import os
from utils.close_connection import on_timeout
filename = os.path.join( cfg['session']['working_directory'], cfg['session']['id'] + '_' + 'field_types.json' ) filename = os.path.join( cfg['session']['working_directory'], cfg['session']['id'] + '_' + 'field_types.json' )
with open(filename, 'r') as fp: with open(filename, 'r') as fp:
field_types = json.load(fp) field_types = json.load(fp)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'])) connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host']))
timeout = 5
connection.add_timeout(timeout, on_timeout(connection))
channel = connection.channel() channel = connection.channel()
exchange = cfg['rabbitmq']['exchange'] exchange = cfg['rabbitmq']['exchange']
......
...@@ -36,7 +36,7 @@ def tag_doc( the_doc ): ...@@ -36,7 +36,7 @@ def tag_doc( the_doc ):
if 'link' in the_doc['metadata-fr'].keys(): if 'link' in the_doc['metadata-fr'].keys():
for link in the_doc['metadata-fr']['link']: for link in the_doc['metadata-fr']['link']:
#print(link) #print(link)
if any( [x in link['protocol'] for x in ['OGC:WFS', 'OGC:WMS', 'KML', 'WS']] ): if 'service' in link.keys() and any( [x in link['service'] for x in ['WFS', 'WMS', 'KML', 'WS']] ):
tag_dict['isQueryable'] = True tag_dict['isQueryable'] = True
break break
...@@ -163,12 +163,18 @@ def index_docs(channel, method, properties, body, es): ...@@ -163,12 +163,18 @@ def index_docs(channel, method, properties, body, es):
def main(cfg): def main(cfg):
from utils.close_connection import on_timeout
es = Elasticsearch([cfg['indexer']['url']], timeout=60) es = Elasticsearch([cfg['indexer']['url']], timeout=60)
es_logger = logging.getLogger('elasticsearch') es_logger = logging.getLogger('elasticsearch')
es_logger.setLevel(logging.INFO) es_logger.setLevel(logging.INFO)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'])) connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host']))
timeout = 5
connection.add_timeout(timeout, on_timeout(connection))
channel = connection.channel() channel = connection.channel()
exchange = cfg['rabbitmq']['exchange'] exchange = cfg['rabbitmq']['exchange']
......
...@@ -137,6 +137,20 @@ template = { ...@@ -137,6 +137,20 @@ template = {
} }
} }
}, },
{
"nested-template": {
"path_match": "metadata-fr.responsibleParty",
"mapping": {
"type": "nested"
# "fields": {
# "sort":
# {
# "type": "boolean"
# }
# }
}
}
},
{ {
"unindexed-path-template": { "unindexed-path-template": {
"match_pattern": "regex", "match_pattern": "regex",
......
...@@ -3,6 +3,8 @@ from pprint import pprint ...@@ -3,6 +3,8 @@ from pprint import pprint
import json import json
import requests import requests
from .my_logging import logging from .my_logging import logging
#import logging
def translate_content_type( content_type ): def translate_content_type( content_type ):
...@@ -19,6 +21,37 @@ def translate_content_type( content_type ): ...@@ -19,6 +21,37 @@ def translate_content_type( content_type ):
return output return output
def protocol_to_formats_and_services( links ):
output = links.copy()
for k, link in enumerate(links):
if link['protocol'] == 'OGC:WMS':
output[k]['formats'] = ['PNG', 'JPEG']
output[k]['service'] = 'WMS'
elif link['protocol'] == 'OGC:WFS':
output[k]['formats'] = ['GML', 'GeoJSON', 'ShapeFile']
output[k]['service'] = 'WFS'
elif link['protocol'] == 'OGC:WCS':
output[k]['formats'] = ['TIFF']
output[k]['service'] = 'WCS'
elif link['protocol'] == 'KML':
output[k]['formats'] = ['KML']
output[k]['service'] = 'KML'
elif link['protocol'] == 'WS':
output[k]['formats'] = ['JSON', 'ShapeFile']
output[k]['service'] = 'WS'
elif link['protocol'] == 'SOS':
output[k]['formats'] = ['JSON', 'XML']
output[k]['service'] = 'SOS'
else:
output[k]['formats'] = [ link['protocol'] ]
del output[k]['protocol']
return output
def fix_links( links ): def fix_links( links ):
...@@ -62,8 +95,6 @@ def fix_links( links ): ...@@ -62,8 +95,6 @@ def fix_links( links ):
fixed_links[k]['protocol'] = known_format.upper() fixed_links[k]['protocol'] = known_format.upper()
continue continue
#pprint(fixed_links)
# FIX TIF -> TIFF # FIX TIF -> TIFF
for k, link in enumerate(fixed_links): for k, link in enumerate(fixed_links):
...@@ -104,7 +135,7 @@ def fix_links( links ): ...@@ -104,7 +135,7 @@ def fix_links( links ):
fixed_links[k]['url'] = link['url'].split('?')[0] fixed_links[k]['url'] = link['url'].split('?')[0]
return fixed_links return protocol_to_formats_and_services(fixed_links)
if __name__ == '__main__': if __name__ == '__main__':
...@@ -190,9 +221,16 @@ if __name__ == '__main__': ...@@ -190,9 +221,16 @@ if __name__ == '__main__':
"protocol": "WWW-LINK", "protocol": "WWW-LINK",
"content-type": "WWW:LINK", "content-type": "WWW:LINK",
"unknown": "0" "unknown": "0"
},
{
"name": "MNT2009_ombrage_10m_CC46",
"description": "Ombrage du relief du Grand Lyon 2009",
"url": "https://download.data.grandlyon.com/wcs/grandlyon",
"protocol": "OGC:WCS",
"content-type": "OGC:WCS",
"unknown": "1"
} }
] ]
fixed_links = fix_links(links) fixed_links = fix_links(links)
pprint(fixed_links) pprint(fixed_links)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment