Newer
Older
import urllib
import json
from elasticsearch import Elasticsearch, NotFoundError
from copy import deepcopy
from pymongo import MongoClient
from collections import OrderedDict
from utils.exit_gracefully import exit_gracefully
from utils.my_logging import logging
Alessandro Cerioni
committed
def filter_function( x, the_uuids_to_filter_out ):
return x['geonet:info']['uuid'] not in the_uuids_to_filter_out
def get_metadata_records( root_url, no_records_per_page, uuid=None, the_filter=None, username=None, password=None ):
params = OrderedDict()
# GeoSource's q service
params['_content_type'] = 'json'
params['resultType'] = 'details'
params['sortBy'] = 'source' # N.B.: sort by title yields duplicates !
#params['sortOrder'] = 'reverse'
params['fast'] = 'index'
params['buildSummary'] = 'false'
if uuid != None:
params['uuid'] = uuid
#fromRecord = 0
cnt = 0
while True:
params['from'] = 1 + cnt*no_records_per_page
params['to'] = params['from'] + no_records_per_page - 1
logging.debug("Get metadata pages, from record no. %s to record no. %s." % (params['from'],params['to']))
if username != None and password != None:
res = requests.get(root_url, params=params, auth=(username, password))
else:
res = requests.get(root_url, params=params)
logging.debug(res.url)
try:
res.json()['metadata']
except KeyError as e:
raise RecordNotFound('The record with uuid=%s was not found! Are you sure that it actually exists and that you have the proper access rights?' % uuid)
if type(res.json()['metadata']) is list:
records = res.json()['metadata']
else:
records = [res.json()['metadata']]
# print(records)
logging.debug("Got %s records." % len(records))
# adding hrefs...
params_copy = params.copy()
del params_copy['from']
del params_copy['to']
del params_copy['sortBy']
for record in records:
params_copy['uuid'] = record['geonet:info']['uuid']
record['href'] = root_url + "?" + urllib.parse.urlencode(params_copy)
Alessandro Cerioni
committed
# apply filter
filtered_records = records
Alessandro Cerioni
committed
if the_filter != None:
logging.debug("Filtering out unwanted records, if present.")
filtered_records = [x for x in records if x['geonet:info']['uuid'] not in the_filter]
if len(records) < no_records_per_page:
break # it means that we have reached the last page
cnt += 1
# def send_page( the_session_id, the_geonetwork_root_url, the_dest_index, the_page, the_channel, the_exchange, the_routing_key ):
def send_record_to_the_metadata_processor( the_cfg, the_record ):
This function sends a GeoNetwork metadata record to a RabbitMQ queue.
the_cfg = deepcopy(the_cfg)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port']))
channel = connection.channel()
exchange = the_cfg['rabbitmq']['exchange']
queue_name = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['queue_name_1_suffix']
routing_key = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['routing_key_1_suffix']
del the_cfg['rabbitmq']['queue_name_1_suffix']
del the_cfg['rabbitmq']['routing_key_1_suffix']
channel.exchange_declare(exchange=exchange, exchange_type='direct')
channel.queue_declare(queue=queue_name, durable=True)
channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key)
msg['header'] = dict()
msg['header']['cfg'] = the_cfg
the_body = msgpack.packb(msg, use_bin_type=False)
channel.basic_publish( exchange=exchange,
routing_key=routing_key,
body=the_body,
properties=pika.BasicProperties(delivery_mode = 2)
)
connection.close()
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# def delete_dataset_from_indices( the_cfg, the_uuid ):
#
# the_cfg = deepcopy(the_cfg)
#
# connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port']))
# channel = connection.channel()
# exchange = the_cfg['rabbitmq']['exchange']
# queue_name = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['queue_name_4_suffix']
# routing_key = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['routing_key_4_suffix']
#
# del the_cfg['rabbitmq']['queue_name_4_suffix']
# del the_cfg['rabbitmq']['routing_key_4_suffix']
#
# channel.exchange_declare(exchange=exchange, exchange_type='direct')
# channel.queue_declare(queue=queue_name, durable=True)
# channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key)
#
# msg = dict()
#
# msg['header'] = dict()
# msg['header']['cfg'] = the_cfg
# msg['body'] = the_uuid
#
# the_body = msgpack.packb(msg, use_bin_type=False)
#
# channel.basic_publish( exchange=exchange,
# routing_key=routing_key,
# body=the_body,
# properties=pika.BasicProperties(delivery_mode = 2)
# )
#
# connection.close()
#
# return
def delete_dataset_from_dest_index( the_cfg, the_uuid ):
logging.info("Removing dataset with uuid = %s from the destination index..." % the_uuid)
es = Elasticsearch([the_cfg['indexer']['url']])
es_logger = logging.getLogger('elasticsearch')
es_logger.setLevel(logging.INFO)
index = the_cfg['indexer']['index']
# only one version should be present, either "meta" or "full"
# we try removing them both as we do not know which one is present for a given dataset
for suffix in ['meta', 'full']:
the_query = dict()
the_query['query'] = dict()
the_query['query']['term'] = {'uuid.keyword': '{0}.{1}'.format(the_uuid, suffix)}
try:
res = es.delete_by_query(index, doc_type='_doc', body=the_query)
logging.debug(res)
except NotFoundError:
pass
except Exception as e:
logging.error(e)
return
def main(cfg):
cfg = deepcopy(cfg)
logging.info('Starting indexing session: %s.' % cfg['session']['id'])
Alessandro Cerioni
committed
uuids_to_get = cfg['metadata_getter']['uuids_to_get']
uuids_to_filter_out = cfg['metadata_getter']['uuids_to_filter_out']
del cfg['metadata_getter'] # <- as this info is no longer needed
username = cfg['geonetwork']['username']
del cfg['geonetwork']['username'] # <- as this info is no longer needed
password = cfg['geonetwork']['password']
del cfg['geonetwork']['password'] # <- as this info is no longer needed
# get some datasets only
Alessandro Cerioni
committed
for uuid_to_get in uuids_to_get:
for record in get_metadata_records( cfg['geonetwork']['url'],
cfg['geonetwork']['records_per_page'],
uuid=uuid_to_get,
the_filter=uuids_to_filter_out,
username=username, password=password ):
# delete_dataset_from_indices(cfg, record['geonet:info']['uuid'])
delete_dataset_from_dest_index(cfg, record['geonet:info']['uuid'])
send_record_to_the_metadata_processor(cfg, record)
# get all the datasets
for record in get_metadata_records( cfg['geonetwork']['url'],
cfg['geonetwork']['records_per_page'],
uuid=None,
the_filter=uuids_to_filter_out,
username=username, password=password ):
# delete_dataset_from_indices(cfg, record['geonet:info']['uuid'])
delete_dataset_from_dest_index(cfg, record['geonet:info']['uuid'])
send_record_to_the_metadata_processor(cfg, record)
#connection.close()
return
if __name__ == '__main__':
import yaml
import signal