Newer
Older
import urllib
import json
from copy import deepcopy
from pymongo import MongoClient
from collections import OrderedDict
from utils.exit_gracefully import exit_gracefully
from utils.my_logging import logging
Alessandro Cerioni
committed
def filter_function( x, the_uuids_to_filter_out ):
return x['geonet:info']['uuid'] not in the_uuids_to_filter_out
def get_pages( root_url, no_records_per_page, uuid=None, the_filter=None, username=None, password=None ):
params = OrderedDict()
# GeoSource's q service
params['_content_type'] = 'json'
params['resultType'] = 'details'
params['sortBy'] = 'source' # N.B.: sort by title yields duplicates !
#params['sortOrder'] = 'reverse'
params['fast'] = 'index'
params['buildSummary'] = 'false'
if uuid != None:
params['uuid'] = uuid
#fromRecord = 0
cnt = 0
while True:
params['from'] = 1 + cnt*no_records_per_page
params['to'] = params['from'] + no_records_per_page - 1
logging.debug("Get metadata pages, from record no. %s to record no. %s." % (params['from'],params['to']))
if username != None and password != None:
res = requests.get(root_url, params=params, auth=(username, password))
else:
res = requests.get(root_url, params=params)
logging.debug(res.url)
try:
res.json()['metadata']
except KeyError as e:
raise RecordNotFound('The record with uuid=%s was not found! Are you sure that it actually exists and that you have the proper access rights?' % uuid)
if type(res.json()['metadata']) is list:
records = res.json()['metadata']
else:
records = [res.json()['metadata']]
# print(records)
logging.debug("Got %s records." % len(records))
# adding hrefs...
params_copy = params.copy()
del params_copy['from']
del params_copy['to']
del params_copy['sortBy']
for record in records:
params_copy['uuid'] = record['geonet:info']['uuid']
record['href'] = root_url + "?" + urllib.parse.urlencode(params_copy)
Alessandro Cerioni
committed
# apply filter
filtered_records = records
Alessandro Cerioni
committed
if the_filter != None:
logging.debug("Filtering out unwanted records, if present.")
filtered_records = [x for x in records if x['geonet:info']['uuid'] not in the_filter]
Alessandro Cerioni
committed
yield filtered_records
if len(records) < no_records_per_page:
break # it means that we have reached the last page
cnt += 1
# def send_page( the_session_id, the_geonetwork_root_url, the_dest_index, the_page, the_channel, the_exchange, the_routing_key ):
def send_page( the_cfg, the_page ):
"""
This function sends a page of GeoNetwork results to a RabbitMQ queue.
"""
the_cfg = deepcopy(the_cfg)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port']))
channel = connection.channel()
exchange = the_cfg['rabbitmq']['exchange']
queue_name = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['queue_name_1_suffix']
routing_key = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['routing_key_1_suffix']
del the_cfg['rabbitmq']['queue_name_1_suffix']
del the_cfg['rabbitmq']['routing_key_1_suffix']
channel.exchange_declare(exchange=exchange, exchange_type='direct')
channel.queue_declare(queue=queue_name, durable=True)
channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key)
msg['header'] = dict()
msg['header']['cfg'] = the_cfg
msg['body'] = the_page
the_body = msgpack.packb(msg, use_bin_type=False)
channel.basic_publish( exchange=exchange,
routing_key=routing_key,
body=the_body,
properties=pika.BasicProperties(delivery_mode = 2)
)
connection.close()
def main(cfg):
cfg = deepcopy(cfg)
logging.info('Starting indexing session: %s.' % cfg['session']['id'])
Alessandro Cerioni
committed
uuids_to_get = cfg['metadata_getter']['uuids_to_get']
uuids_to_filter_out = cfg['metadata_getter']['uuids_to_filter_out']
del cfg['metadata_getter'] # <- as this info is no longer needed
username = cfg['geonetwork']['username']
del cfg['geonetwork']['username'] # <- as this info is no longer needed
password = cfg['geonetwork']['password']
del cfg['geonetwork']['password'] # <- as this info is no longer needed
# get some datasets only
Alessandro Cerioni
committed
for uuid_to_get in uuids_to_get:
for page in get_pages( cfg['geonetwork']['url'],
cfg['geonetwork']['records_per_page'],
uuid=uuid_to_get,
the_filter=uuids_to_filter_out,
username=username, password=password ):
send_page(cfg, page)
# get all the datasets
for page in get_pages( cfg['geonetwork']['url'],
cfg['geonetwork']['records_per_page'],
uuid=None,
the_filter=uuids_to_filter_out,
username=username, password=password ):
send_page(cfg, page)
#connection.close()
return
if __name__ == '__main__':
import yaml
import signal