Newer
Older
import pika
import msgpack
import requests
from utils.exit_gracefully import exit_gracefully
from utils.my_logging import logging
from pymongo import MongoClient
class RecordNotFound(Exception):
pass
Alessandro Cerioni
committed
def filter_function( x, the_uuids_to_filter_out ):
return x['geonet:info']['uuid'] not in the_uuids_to_filter_out
def get_pages( root_url, no_records_per_page, uuid=None, the_filter=None, username=None, password=None ):
params = {}
# GeoSource's q service
params['_content_type'] = 'json'
params['resultType'] = 'details'
params['sortBy'] = 'source' # N.B.: sort by title yields duplicates !
#params['sortOrder'] = 'reverse'
params['fast'] = 'index'
params['buildSummary'] = 'false'
if uuid != None:
params['uuid'] = uuid
#fromRecord = 0
cnt = 0
while True:
params['from'] = 1 + cnt*no_records_per_page
params['to'] = params['from'] + no_records_per_page - 1
logging.debug("Get metadata pages, from record no. %s to record no. %s." % (params['from'],params['to']))
if username != None and password != None:
res = requests.get(root_url, params=params, auth=(username, password))
else:
res = requests.get(root_url, params=params)
logging.debug(res.url)
try:
res.json()['metadata']
except KeyError as e:
raise RecordNotFound('The record with uuid=%s was not found! Are you sure that it actually exists and that you have the proper access rights?' % uuid)
if type(res.json()['metadata']) is list:
records = res.json()['metadata']
else:
records = [res.json()['metadata']]
# print(records)
logging.debug("Got %s records." % len(records))
Alessandro Cerioni
committed
# apply filter
if the_filter != None:
logging.debug("Filtering out unwanted records, if present.")
filtered_records = [x for x in records if x['geonet:info']['uuid'] not in the_filter]
Alessandro Cerioni
committed
yield filtered_records
if len(records) < no_records_per_page:
break # it means that we have reached the last page
cnt += 1
def send_page( the_session_id, the_geonetwork_root_url, the_dest_index, the_page, the_channel, the_exchange, the_routing_key ):
"""
This function sends a page of GeoNetwork results to a RabbitMQ queue.
"""
msg = {'header': {'geonetwork_root_url': the_geonetwork_root_url, 'session_id': the_session_id, 'dest_index': the_dest_index}, 'body': the_page}
the_body = msgpack.packb(msg, use_bin_type=False)
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
the_channel.basic_publish( exchange=the_exchange,
routing_key=the_routing_key,
body=the_body,
properties=pika.BasicProperties(delivery_mode = 2)
)
return
def main(cfg):
import datetime
logging.info('Starting indexing session: %s.' % cfg['session']['id'])
# logging the indexing session into MongoDB
mongo_client = MongoClient('mongodb://%s:%s@%s:%s/' % (cfg['mongo']['username'],
cfg['mongo']['password'],
cfg['mongo']['host'],
cfg['mongo']['port']))
mongo_db = mongo_client[cfg['mongo']['report-db']]
collection = mongo_db['indexing-sessions']
doc = dict()
doc['_id'] = cfg['session']['id']
doc['started_at'] = datetime.datetime.utcnow()
doc['configuration'] = cfg
res = collection.replace_one({'_id': doc['_id']}, doc, upsert=True)
if res.acknowledged != True:
logging.error("Unable to push the following doc to MongoDB:")
logging.error(doc)
exit(1)
# ------------------------------------------------------------------
connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host']))
channel = connection.channel()
exchange = cfg['rabbitmq']['exchange']
queue_name = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_1_suffix']
routing_key = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_1_suffix']
channel.exchange_declare(exchange=cfg['rabbitmq']['exchange'], exchange_type='direct')
channel.queue_declare(queue=queue_name, durable=True)
channel.queue_bind(exchange=cfg['rabbitmq']['exchange'], queue=queue_name, routing_key=routing_key)
Alessandro Cerioni
committed
uuids_to_get = cfg['metadata_getter']['uuids_to_get']
uuids_to_filter_out = cfg['metadata_getter']['uuids_to_filter_out']
username = cfg['geonetwork']['username']
password = cfg['geonetwork']['password']
Alessandro Cerioni
committed
for uuid_to_get in uuids_to_get:
for page in get_pages( cfg['geonetwork']['url'], cfg['geonetwork']['records_per_page'], uuid=uuid_to_get, the_filter=uuids_to_filter_out, username=username, password=password ):
send_page(cfg['session']['id'], cfg['geonetwork']['url'], cfg['indexer']['index'], page, channel, exchange, queue_name)
else:
for page in get_pages( cfg['geonetwork']['url'], cfg['geonetwork']['records_per_page'], uuid=None, the_filter=uuids_to_filter_out, username=username, password=password ):
send_page(cfg['session']['id'], cfg['geonetwork']['url'], cfg['indexer']['index'], page, channel, exchange, queue_name)
connection.close()
if __name__ == '__main__':
import yaml
import signal
signal.signal(signal.SIGINT, exit_gracefully)
# read 'n' parse the configuration
with open("config.yaml", 'r') as yamlfile:
cfg = yaml.load(yamlfile)
try:
main(cfg)
except Exception as e:
logging.error(e)