import pika import msgpack import requests import urllib import json import time from elasticsearch import Elasticsearch, NotFoundError, AuthorizationException from copy import deepcopy #from pymongo import MongoClient from collections import OrderedDict from lib.geonetwork_helper import get_metadata_records from lib.exit_gracefully import exit_gracefully from lib.my_logging import logging from lib.locker import lock, unlock, is_locked def setup_indices(cfg): source_es = Elasticsearch([cfg['indexer']['url']], timeout=60) source_index = cfg['indexer']['index'] es_body = { "settings" : { "number_of_shards" : 1, "number_of_replicas" : 0, "index.mapping.total_fields.limit": 10000, "refresh_interval": "30s" }, "mappings": { "_doc": { "dynamic_templates": [ # priority is given by order! { "uuid" : { "path_match": "uuid", "mapping": { "type": "text", "index": False, "fields": { "keyword": { "type": "keyword" } } } } }, { "default" : { "path_match": "*", "mapping": { "enabled": "false" } } } ] } } } try: es_body['settings']['number_of_replicas'] = cfg['reindexer']['number_of_replicas'] rep = source_es.indices.create(source_index, es_body)#, wait_for_active_shards=0) except Exception as e: logging.error(e) destin_es = Elasticsearch([cfg['indexer']['url']], timeout=60) destin_index = cfg['indexer']['index'] from lib.elasticsearch_template import template template['index_patterns'] = [ cfg['reindexer']['destination_index'] ] template['settings']['number_of_shards'] = cfg['reindexer']['number_of_shards'] template['settings']['number_of_replicas'] = cfg['reindexer']['number_of_replicas'] try: rep = destin_es.indices.delete_template(cfg['reindexer']['template_name']) logging.debug(rep) except Exception as e: logging.error(e) try: rep = destin_es.indices.create(cfg['reindexer']['template_name'], template) logging.debug(rep) except Exception as e: logging.warning(e) return # def send_page( the_session_id, the_geonetwork_root_url, the_dest_index, the_page, the_channel, the_exchange, the_routing_key ): def send_record_to_the_metadata_processor( the_cfg, the_record ): """ This function sends a GeoNetwork metadata record to a RabbitMQ queue. """ the_cfg = deepcopy(the_cfg) connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port'])) channel = connection.channel() exchange = the_cfg['rabbitmq']['exchange'] queue_name = the_cfg['rabbitmq']['queue_name_1'] routing_key = the_cfg['rabbitmq']['routing_key_1'] del the_cfg['rabbitmq']['queue_name_1'] del the_cfg['rabbitmq']['routing_key_1'] channel.exchange_declare(exchange=exchange, exchange_type='direct') channel.queue_declare(queue=queue_name, durable=True, arguments={'x-message-ttl' : the_cfg['rabbitmq']['ttl']}) channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key) msg = dict() msg['header'] = dict() msg['header']['cfg'] = the_cfg msg['body'] = the_record the_body = msgpack.packb(msg, use_bin_type=False) channel.basic_publish( exchange=exchange, routing_key=routing_key, body=the_body, properties=pika.BasicProperties(delivery_mode = 2) ) connection.close() return # def delete_dataset_from_indices( the_cfg, the_uuid ): # # the_cfg = deepcopy(the_cfg) # # connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port'])) # channel = connection.channel() # exchange = the_cfg['rabbitmq']['exchange'] # queue_name = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['queue_name_4_suffix'] # routing_key = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['routing_key_4_suffix'] # # del the_cfg['rabbitmq']['queue_name_4_suffix'] # del the_cfg['rabbitmq']['routing_key_4_suffix'] # # channel.exchange_declare(exchange=exchange, exchange_type='direct') # channel.queue_declare(queue=queue_name, durable=True) # channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key) # # msg = dict() # # msg['header'] = dict() # msg['header']['cfg'] = the_cfg # msg['body'] = the_uuid # # the_body = msgpack.packb(msg, use_bin_type=False) # # channel.basic_publish( exchange=exchange, # routing_key=routing_key, # body=the_body, # properties=pika.BasicProperties(delivery_mode = 2) # ) # # connection.close() # # return def delete_dataset_from_dest_index( the_cfg, the_uuid ): logging.info("Removing dataset with uuid = %s from the destination index..." % the_uuid) es = Elasticsearch([the_cfg['indexer']['url']], timeout=60) es_logger = logging.getLogger('elasticsearch') es_logger.setLevel(logging.INFO) index = the_cfg['indexer']['index'] # only one version should be present, either "meta" or "full" # we try removing them both as we do not know which one is present for a given dataset for suffix in ['meta', 'full']: the_query = dict() the_query['query'] = dict() the_query['query']['term'] = {'uuid.keyword': '{0}.{1}'.format(the_uuid, suffix)} try: res = es.delete_by_query(index, doc_type='_doc', body=the_query, conflicts='proceed', refresh=True, wait_for_completion=False) logging.debug(res) task_id = res['task'] # wait until ES is done seconds_to_sleep_for = 1 while True: res = es.tasks.get(task_id=task_id) logging.debug(res) completed = res['completed'] if not completed: logging.info('Waiting for delete_by_query to complete: sleeping for %i seconds...' % seconds_to_sleep_for) time.sleep(seconds_to_sleep_for) seconds_to_sleep_for += 1 else: break except AuthorizationException as e: logging.critical(e) exit(1) except NotFoundError as e: logging.error(e) except Exception as e: logging.error(e) return def get_metadata_records_processor( the_cfg, the_root_url, the_no_records_per_page, the_uuid_to_get, the_uuids_to_filter_out=None, the_username=None, the_password=None): for record in get_metadata_records( root_url=the_root_url, no_records_per_page=the_no_records_per_page, uuid=the_uuid_to_get, the_filter=the_uuids_to_filter_out, username=the_username, password=the_password ): the_uuid = record['geonet:info']['uuid'] if is_locked(the_cfg['session']['working_directory'], the_uuid ): logging.info("Dataset with uuid = %s is already being indexed: skipping." % the_uuid) continue else: logging.info("Setting lock for dataset with uuid = %s" % the_uuid) lock(the_cfg['session']['working_directory'], the_uuid) # delete_dataset_from_indices(cfg, record['geonet:info']['uuid']) delete_dataset_from_dest_index(the_cfg, the_uuid) send_record_to_the_metadata_processor(the_cfg, record) def main(cfg): cfg = deepcopy(cfg) logging.info("Setting up ingest and digest ") setup_indices(cfg) uuids_to_get = cfg['metadata_getter']['uuids_to_get'] uuids_to_filter_out = cfg['metadata_getter']['uuids_to_filter_out'] del cfg['metadata_getter'] # <- as this info is no longer needed username = cfg['geonetwork']['username'] del cfg['geonetwork']['username'] # <- as this info is no longer needed password = cfg['geonetwork']['password'] del cfg['geonetwork']['password'] # <- as this info is no longer needed if 'all' in uuids_to_get: uuids_to_get = [None] for uuid_to_get in uuids_to_get: get_metadata_records_processor( the_cfg = cfg, the_root_url = cfg['geonetwork']['url'], the_no_records_per_page = cfg['geonetwork']['records_per_page'], the_uuid_to_get = uuid_to_get, the_uuids_to_filter_out = uuids_to_filter_out, the_username = username, the_password = password) # if 'all' not in uuids_to_get: # # # get some datasets only # for uuid_to_get in uuids_to_get: # for record in get_metadata_records( cfg['geonetwork']['url'], # cfg['geonetwork']['records_per_page'], # uuid=uuid_to_get, # the_filter=uuids_to_filter_out, # username=username, password=password ): # # the_uuid = record['geonet:info']['uuid'] # # if is_locked( cfg['session']['working_directory'], the_uuid ): # logging.info("Dataset with uuid = %s is already being indexed: skipping." % the_uuid) # continue # else: # lock(cfg['session']['working_directory'], the_uuid) # # # delete_dataset_from_indices(cfg, record['geonet:info']['uuid']) # delete_dataset_from_dest_index(cfg, the_uuid) # send_record_to_the_metadata_processor(cfg, record) # # else: # # # get all the datasets # for record in get_metadata_records( cfg['geonetwork']['url'], # cfg['geonetwork']['records_per_page'], # uuid=None, # the_filter=uuids_to_filter_out, # username=username, password=password ): # # the_uuid = record['geonet:info']['uuid'] # # if is_locked( cfg['session']['working_directory'], the_uuid ): # logging.info("Dataset with uuid = %s is already being indexed: skipping." % the_uuid) # continue # else: # lock(cfg['session']['working_directory'], the_uuid) # # # delete_dataset_from_indices(cfg, record['geonet:info']['uuid']) # delete_dataset_from_dest_index(cfg, record['geonet:info']['uuid']) # send_record_to_the_metadata_processor(cfg, record) #connection.close() return if __name__ == '__main__': import signal import argparse from yaml import load, dump try: from yaml import CLoader as Loader, CDumper as Dumper except ImportError: from yaml import Loader, Dumper signal.signal(signal.SIGINT, exit_gracefully) parser = argparse.ArgumentParser(description='Metadata getter') parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, required=True) parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672) parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True) # parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True) parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) args = parser.parse_args() logging.getLogger().setLevel(args.loglevel) logging.info('Starting...') # read 'n' parse the configuration with open("config.yaml", 'r') as yamlfile: cfg = load(yamlfile, Loader=Loader) cfg['rabbitmq']['host'] = args.host cfg['rabbitmq']['port'] = args.port cfg['rabbitmq']['exchange'] = args.exchange try: main(cfg) except pika.exceptions.AMQPConnectionError: logging.info('RabbitMQ is not reachable: exiting.') #time.sleep(5) except Exception as e: logging.error(e)