import pika import msgpack import requests from utils.exit_gracefully import exit_gracefully from utils.my_logging import logging from pymongo import MongoClient class RecordNotFound(Exception): pass def filter_function( x, the_uuids_to_filter_out ): return x['geonet:info']['uuid'] not in the_uuids_to_filter_out # GEONETWORK METADATA GETTER def get_pages( root_url, no_records_per_page, uuid=None, the_filter=None ): params = {} # GeoSource's q service params['_content_type'] = 'json' params['resultType'] = 'details' params['sortBy'] = 'source' # N.B.: sort by title yields duplicates ! #params['sortOrder'] = 'reverse' params['fast'] = 'index' params['buildSummary'] = 'false' if uuid != None: params['uuid'] = uuid #fromRecord = 0 cnt = 0 while True: params['from'] = 1 + cnt*no_records_per_page params['to'] = params['from'] + no_records_per_page - 1 logging.debug("Get metadata pages, from record no. %s to record no. %s." % (params['from'],params['to'])) res = requests.get(root_url, params=params) logging.debug(res.url) try: res.json()['metadata'] except KeyError as e: raise RecordNotFound('The record with uuid=%s was not found! Are you sure that it actually exists?' % uuid) if type(res.json()['metadata']) is list: records = res.json()['metadata'] else: records = [res.json()['metadata']] # print(records) logging.debug("Got %s records." % len(records)) filtered_records = records # apply filter if the_filter != None: logging.debug("Filtering out unwanted records, if present.") filtered_records = [x for x in records if x['geonet:info']['uuid'] not in the_filter] yield filtered_records if len(records) < no_records_per_page: break # it means that we have reached the last page cnt += 1 def send_page( the_session_id, the_geonetwork_root_url, the_dest_index, the_page, the_channel, the_exchange, the_routing_key ): """ This function sends a page of GeoNetwork results to a RabbitMQ queue. """ msg = {'header': {'geonetwork_root_url': the_geonetwork_root_url, 'session_id': the_session_id, 'dest_index': the_dest_index}, 'body': the_page} the_body = msgpack.packb(msg, use_bin_type=True) the_channel.basic_publish( exchange=the_exchange, routing_key=the_routing_key, body=the_body, properties=pika.BasicProperties(delivery_mode = 2) ) return def main(cfg): import datetime logging.info('Starting indexing session: %s.' % cfg['session']['id']) # logging the indexing session into MongoDB mongo_client = MongoClient('mongodb://%s:%s@%s:%s/' % (cfg['mongo']['username'], cfg['mongo']['password'], cfg['mongo']['host'], cfg['mongo']['port'])) mongo_db = mongo_client[cfg['mongo']['report-db']] collection = mongo_db['indexing-sessions'] doc = dict() doc['_id'] = cfg['session']['id'] doc['started_at'] = datetime.datetime.utcnow() doc['configuration'] = cfg res = collection.replace_one({'_id': doc['_id']}, doc, upsert=True) if res.acknowledged != True: logging.error("Unable to push the following doc to MongoDB:") logging.error(doc) exit(1) # ------------------------------------------------------------------ connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'])) channel = connection.channel() exchange = cfg['rabbitmq']['exchange'] queue_name = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_1_suffix'] routing_key = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_1_suffix'] channel.exchange_declare(exchange=cfg['rabbitmq']['exchange'], exchange_type='direct') channel.queue_declare(queue=queue_name, durable=True) channel.queue_bind(exchange=cfg['rabbitmq']['exchange'], queue=queue_name, routing_key=routing_key) uuids_to_get = cfg['metadata_getter']['uuids_to_get'] uuids_to_filter_out = cfg['metadata_getter']['uuids_to_filter_out'] #print(uuids_to_get) if 'all' not in uuids_to_get: for uuid_to_get in uuids_to_get: for page in get_pages( cfg['geonetwork']['url'], cfg['geonetwork']['records_per_page'], uuid=uuid_to_get, the_filter=uuids_to_filter_out ): send_page(cfg['session']['id'], cfg['geonetwork']['url'], cfg['indexer']['index'], page, channel, exchange, queue_name) else: for page in get_pages( cfg['geonetwork']['url'], cfg['geonetwork']['records_per_page'], uuid=None, the_filter=uuids_to_filter_out ): send_page(cfg['session']['id'], cfg['geonetwork']['url'], cfg['indexer']['index'], page, channel, exchange, queue_name) connection.close() if __name__ == '__main__': import yaml import signal signal.signal(signal.SIGINT, exit_gracefully) # read 'n' parse the configuration with open("config.yaml", 'r') as yamlfile: cfg = yaml.load(yamlfile) try: main(cfg) except Exception as e: logging.error(e)