Newer
Older
import pika
import msgpack
import requests
from utils.exit_gracefully import exit_gracefully
from utils.my_logging import logging
from pymongo import MongoClient
class RecordNotFound(Exception):
pass
Alessandro Cerioni
committed
def filter_function( x, the_uuids_to_filter_out ):
return x['geonet:info']['uuid'] not in the_uuids_to_filter_out
Alessandro Cerioni
committed
def get_pages( root_url, no_records_per_page, uuid=None, the_filter=None ):
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
params = {}
# GeoSource's q service
params['_content_type'] = 'json'
params['resultType'] = 'details'
params['sortBy'] = 'source' # N.B.: sort by title yields duplicates !
#params['sortOrder'] = 'reverse'
params['fast'] = 'index'
params['buildSummary'] = 'false'
if uuid != None:
params['uuid'] = uuid
#fromRecord = 0
cnt = 0
while True:
params['from'] = 1 + cnt*no_records_per_page
params['to'] = params['from'] + no_records_per_page - 1
logging.debug("Get metadata pages, from record no. %s to record no. %s." % (params['from'],params['to']))
res = requests.get(root_url, params=params)
logging.debug(res.url)
try:
res.json()['metadata']
except KeyError as e:
raise RecordNotFound('The record with uuid=%s was not found! Are you sure that it actually exists?' % uuid)
if type(res.json()['metadata']) is list:
records = res.json()['metadata']
else:
records = [res.json()['metadata']]
# print(records)
logging.debug("Got %s records." % len(records))
Alessandro Cerioni
committed
# apply filter
if the_filter != None:
logging.debug("Filtering out unwanted records, if present.")
filtered_records = [x for x in records if x['geonet:info']['uuid'] not in the_filter]
Alessandro Cerioni
committed
yield filtered_records
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
if len(records) < no_records_per_page:
break # it means that we have reached the last page
cnt += 1
def send_page( the_session_id, the_geonetwork_root_url, the_dest_index, the_page, the_channel, the_exchange, the_routing_key ):
"""
This function sends a page of GeoNetwork results to a RabbitMQ queue.
"""
msg = {'header': {'geonetwork_root_url': the_geonetwork_root_url, 'session_id': the_session_id, 'dest_index': the_dest_index}, 'body': the_page}
the_body = msgpack.packb(msg, use_bin_type=True)
the_channel.basic_publish( exchange=the_exchange,
routing_key=the_routing_key,
body=the_body,
properties=pika.BasicProperties(delivery_mode = 2)
)
return
def main(cfg):
import datetime
logging.info('Starting indexing session: %s.' % cfg['session']['id'])
# logging the indexing session into MongoDB
mongo_client = MongoClient('mongodb://%s:%s@%s:%s/' % (cfg['mongo']['username'],
cfg['mongo']['password'],
cfg['mongo']['host'],
cfg['mongo']['port']))
mongo_db = mongo_client[cfg['mongo']['report-db']]
collection = mongo_db['indexing-sessions']
doc = dict()
doc['_id'] = cfg['session']['id']
doc['started_at'] = datetime.datetime.utcnow()
doc['configuration'] = cfg
res = collection.replace_one({'_id': doc['_id']}, doc, upsert=True)
if res.acknowledged != True:
logging.error("Unable to push the following doc to MongoDB:")
logging.error(doc)
exit(1)
# ------------------------------------------------------------------
connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host']))
channel = connection.channel()
exchange = cfg['rabbitmq']['exchange']
queue_name = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_1_suffix']
routing_key = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_1_suffix']
channel.exchange_declare(exchange=cfg['rabbitmq']['exchange'], exchange_type='direct')
channel.queue_declare(queue=queue_name, durable=True)
channel.queue_bind(exchange=cfg['rabbitmq']['exchange'], queue=queue_name, routing_key=routing_key)
Alessandro Cerioni
committed
uuids_to_get = cfg['metadata_getter']['uuids_to_get']
uuids_to_filter_out = cfg['metadata_getter']['uuids_to_filter_out']
#print(uuids_to_get)
if 'all' not in uuids_to_get:
Alessandro Cerioni
committed
for uuid_to_get in uuids_to_get:
for page in get_pages( cfg['geonetwork']['url'], cfg['geonetwork']['records_per_page'], uuid=uuid_to_get, the_filter=uuids_to_filter_out ):
send_page(cfg['session']['id'], cfg['geonetwork']['url'], cfg['indexer']['index'], page, channel, exchange, queue_name)
else:
Alessandro Cerioni
committed
for page in get_pages( cfg['geonetwork']['url'], cfg['geonetwork']['records_per_page'], uuid=None, the_filter=uuids_to_filter_out ):
send_page(cfg['session']['id'], cfg['geonetwork']['url'], cfg['indexer']['index'], page, channel, exchange, queue_name)
connection.close()
if __name__ == '__main__':
import yaml
import signal
signal.signal(signal.SIGINT, exit_gracefully)
# read 'n' parse the configuration
with open("config.yaml", 'r') as yamlfile:
cfg = yaml.load(yamlfile)
try:
main(cfg)
except Exception as e:
logging.error(e)