Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import pika
import msgpack
import requests
from utils.exit_gracefully import exit_gracefully
from utils.my_logging import logging
from pymongo import MongoClient
class RecordNotFound(Exception):
pass
# GEONETWORK METADATA GETTER
def get_pages( root_url, no_records_per_page, uuid=None ):
params = {}
# GeoSource's q service
params['_content_type'] = 'json'
params['resultType'] = 'details'
params['sortBy'] = 'source' # N.B.: sort by title yields duplicates !
#params['sortOrder'] = 'reverse'
params['fast'] = 'index'
params['buildSummary'] = 'false'
if uuid != None:
params['uuid'] = uuid
#fromRecord = 0
cnt = 0
while True:
params['from'] = 1 + cnt*no_records_per_page
params['to'] = params['from'] + no_records_per_page - 1
logging.debug("Get metadata pages, from record no. %s to record no. %s." % (params['from'],params['to']))
res = requests.get(root_url, params=params)
logging.debug(res.url)
try:
res.json()['metadata']
except KeyError as e:
raise RecordNotFound('The record with uuid=%s was not found! Are you sure that it actually exists?' % uuid)
if type(res.json()['metadata']) is list:
records = res.json()['metadata']
else:
records = [res.json()['metadata']]
# print(records)
logging.debug("Got %s records." % len(records))
#processed_records = process_records(records)
yield records
if len(records) < no_records_per_page:
break # it means that we have reached the last page
cnt += 1
def send_page( the_session_id, the_geonetwork_root_url, the_dest_index, the_page, the_channel, the_exchange, the_routing_key ):
"""
This function sends a page of GeoNetwork results to a RabbitMQ queue.
"""
msg = {'header': {'geonetwork_root_url': the_geonetwork_root_url, 'session_id': the_session_id, 'dest_index': the_dest_index}, 'body': the_page}
the_body = msgpack.packb(msg, use_bin_type=True)
the_channel.basic_publish( exchange=the_exchange,
routing_key=the_routing_key,
body=the_body,
properties=pika.BasicProperties(delivery_mode = 2)
)
return
def main(cfg):
import datetime
logging.info('Starting indexing session: %s.' % cfg['session']['id'])
# logging the indexing session into MongoDB
mongo_client = MongoClient('mongodb://%s:%s@%s:%s/' % (cfg['mongo']['username'],
cfg['mongo']['password'],
cfg['mongo']['host'],
cfg['mongo']['port']))
mongo_db = mongo_client[cfg['mongo']['report-db']]
collection = mongo_db['indexing-sessions']
doc = dict()
doc['_id'] = cfg['session']['id']
doc['started_at'] = datetime.datetime.utcnow()
doc['configuration'] = cfg
res = collection.replace_one({'_id': doc['_id']}, doc, upsert=True)
if res.acknowledged != True:
logging.error("Unable to push the following doc to MongoDB:")
logging.error(doc)
exit(1)
# ------------------------------------------------------------------
connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host']))
channel = connection.channel()
exchange = cfg['rabbitmq']['exchange']
queue_name = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_1_suffix']
routing_key = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_1_suffix']
channel.exchange_declare(exchange=cfg['rabbitmq']['exchange'], exchange_type='direct')
channel.queue_declare(queue=queue_name, durable=True)
channel.queue_bind(exchange=cfg['rabbitmq']['exchange'], queue=queue_name, routing_key=routing_key)
uuids_to_get = cfg['metadata_getter']['uuids']
#print(uuids_to_get)
if 'all' not in uuids_to_get:
for uuid in uuids_to_get:
for page in get_pages( cfg['geonetwork']['url'], cfg['geonetwork']['records_per_page'], uuid ):
send_page(cfg['session']['id'], cfg['geonetwork']['url'], cfg['indexer']['index'], page, channel, exchange, queue_name)
else:
for page in get_pages( cfg['geonetwork']['url'], cfg['geonetwork']['records_per_page'] ):
send_page(cfg['session']['id'], cfg['geonetwork']['url'], cfg['indexer']['index'], page, channel, exchange, queue_name)
connection.close()
if __name__ == '__main__':
import yaml
import signal
signal.signal(signal.SIGINT, exit_gracefully)
# read 'n' parse the configuration
with open("config.yaml", 'r') as yamlfile:
cfg = yaml.load(yamlfile)
try:
main(cfg)
except Exception as e:
logging.error(e)