diff --git a/docker-compose-tools.yml b/docker-compose-tools.yml index b99ffa8f031cf81044560a52fb3096d846e9d9a9..d9d0668bea62de737fbdbf1a8fa846586dfd09fd 100644 --- a/docker-compose-tools.yml +++ b/docker-compose-tools.yml @@ -30,14 +30,5 @@ services: - ${PWD}/config.yaml:/app/config.yaml:ro - working-directory:/app/output - main: - build: . - image: data-grandlyon-com-indexer - command: python main.py --host rabbitmq --exchange download_data_grandlyon_com_index --loglevel DEBUG - volumes: - - ${PWD}/config.yaml:/app/config.yaml:ro - - working-directory:/app/output - - volumes: working-directory: diff --git a/docker-compose-workers.yml b/docker-compose-workers.yml index 1f3e4b497e0f83b3888e76e4e668101427000538..b48696ab607be5ec97365f555fd3675cb3e2b7e4 100644 --- a/docker-compose-workers.yml +++ b/docker-compose-workers.yml @@ -34,7 +34,7 @@ services: command: python workers/doc-processor.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue doc_pages_to_process --loglevel DEBUG volumes: # - ./config.yaml:/app/config.yaml:ro - - working-directory:/app/output + - working-directory:/app/output restart: unless-stopped doc-indexer: @@ -104,17 +104,7 @@ services: # volumes: # - ./config.yaml:/app/config.yaml:ro -# main: -# build: . -# image: data-grandlyon-com-process-main -# depends_on: -# - rabbitmq -# command: python ./main.py --host rabbitmq --exchange download_data_grandlyon_com_index --loglevel DEBUG -# volumes: -# - ./config.yaml:/app/config.yaml:ro - volumes: rabbitmq: working-directory: mongo: -# main: diff --git a/main.py b/main.py index 36dee0b0b44c5abbfe45687b426854881f73dae3..03cad9c152fe63e7a099a33b3a919b8cc1e81506 100644 --- a/main.py +++ b/main.py @@ -52,7 +52,7 @@ def setup_indices(cfg): } try: - rep = source_es.indices.create(source_index, es_body)#, wait_for_active_shards=0) + rep = source_es.indices.create(source_index, es_body) # , wait_for_active_shards=0) except Exception as e: logging.error(e) @@ -73,11 +73,10 @@ def setup_indices(cfg): rep = destin_es.indices.put_template(cfg['reindexer']['template_name'], template) logging.debug(rep) - - - # def send_page( the_session_id, the_geonetwork_root_url, the_dest_index, the_page, the_channel, # the_exchange, the_routing_key ): + + def send_record_to_the_metadata_processor(the_rabbit, the_cfg, the_record): """ This function sends a GeoNetwork metadata record to a RabbitMQ queue. @@ -108,7 +107,8 @@ def send_record_to_the_metadata_processor(the_rabbit, the_cfg, the_record): # the_cfg = deepcopy(the_cfg) # - # connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port'])) + # connection = pika.BlockingConnection(pika.ConnectionParameters( + # host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port'])) # channel = connection.channel() # exchange = the_cfg['rabbitmq']['exchange'] # queue_name = the_cfg['rabbitmq']['queue_name_1'] @@ -144,7 +144,8 @@ def send_record_to_the_metadata_processor(the_rabbit, the_cfg, the_record): # # the_cfg = deepcopy(the_cfg) # -# connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port'])) +# connection = pika.BlockingConnection( +# pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port'])) # channel = connection.channel() # exchange = the_cfg['rabbitmq']['exchange'] # queue_name = the_cfg['session']['id'] + '_' + the_cfg['rabbitmq']['queue_name_4_suffix'] @@ -206,14 +207,13 @@ def delete_dataset_from_dest_index(the_rabbit, the_cfg, the_uuid): try: res = es.delete_by_query(index, doc_type='_doc', body=the_query) es.indices.refresh(index=index) - except AuthorizationException as e: - logging.critical(e) + except AuthorizationException as ae: + logging.critical(ae) exit(1) - except NotFoundError as e: - logging.error(e) - except Exception as e: - logging.error(e) - + except NotFoundError as nfe: + logging.error(nfe) + except Exception as ex: + logging.error(ex) def get_metadata_records_processor(the_rabbit, @@ -256,15 +256,15 @@ def main(cfg): logging.info("Setting up ingest and digest ") setup_indices(cfg) - uuids_to_get = cfg['metadata_getter']['uuids_to_get'] + uuids_to_get = cfg['metadata_getter']['uuids_to_get'] uuids_to_filter_out = cfg['metadata_getter']['uuids_to_filter_out'] - del cfg['metadata_getter'] # <- as this info is no longer needed + del cfg['metadata_getter'] # <- as this info is no longer needed username = cfg['geonetwork']['username'] - del cfg['geonetwork']['username'] # <- as this info is no longer needed + del cfg['geonetwork']['username'] # <- as this info is no longer needed password = cfg['geonetwork']['password'] - del cfg['geonetwork']['password'] # <- as this info is no longer needed + del cfg['geonetwork']['password'] # <- as this info is no longer needed if 'all' in uuids_to_get: uuids_to_get = [None] @@ -295,19 +295,17 @@ def main(cfg): the_password=password) # ---------------------- send log ---------------------------- log_message2 = LogMessage(session_id=cfg['session']['id'], - uuid=uuid_to_get, - step='main', - status='Terminated', - uuid_prefix='meta', - info='no info' - ) + uuid=uuid_to_get, + step='main', + status='Terminated', + uuid_prefix='meta', + info='no info' + ) cfg['session']['current_uuid'] = uuid_to_get rabbit.publish_log(log_message=log_message2.__dict__) # ------------------------------------------------------------ # <-- the rabbit connexion is automatically closed here - - # if 'all' not in uuids_to_get: # # # get some datasets only @@ -351,8 +349,7 @@ def main(cfg): # delete_dataset_from_dest_index(cfg, record['geonet:info']['uuid']) # send_record_to_the_metadata_processor(cfg, record) - - #connection.close() + # connection.close() return @@ -374,8 +371,10 @@ if __name__ == '__main__': parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672) parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True) # parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True) - parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) - parser.add_argument('--sessionid', dest='sessionid', help='the session id', default="ad08d73d-bf3f-457a-bb5e-c4ab561d4666", type=str) + parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, + choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) + parser.add_argument('--sessionid', dest='sessionid', help='the session id', + default="ad08d73d-bf3f-457a-bb5e-c4ab561d4666", type=str) args = parser.parse_args() @@ -395,6 +394,6 @@ if __name__ == '__main__': main(cfg) except pika.exceptions.AMQPConnectionError: logging.info('RabbitMQ is not reachable: exiting.') - #time.sleep(5) + # time.sleep(5) except Exception as e: logging.error(e) diff --git a/workers/doc-indexer.py b/workers/doc-indexer.py index adf2ce03323dacc8408390f7075b68ad4100d2b2..a6305e507679eca2fbcab2d001ec28db585c15c9 100644 --- a/workers/doc-indexer.py +++ b/workers/doc-indexer.py @@ -1,8 +1,6 @@ import pika import msgpack -import time import json -import hashlib import os, sys from elasticsearch import Elasticsearch from elasticsearch.exceptions import AuthorizationException @@ -17,12 +15,14 @@ from lib.my_logging import logging from lib.rabbit_session import RabbitSession from lib.log_message import LogMessage + class DocIndexer: def __init__(self, cfg): self.cfg = cfg self.rabbit = None - def tag_doc(self, the_doc): + @staticmethod + def tag_doc(the_doc): # tag_list = ['isOpenAccess', 'isRealTime', 'isQueryable', 'isSearchable', 'isPunctual', 'isLinear', 'isAreal'] tag_dict = {} @@ -32,23 +32,25 @@ class DocIndexer: # tag_dict[tag] = False # isOpen? - if 'license' in the_doc['metadata-fr'].keys() and not any( [x in the_doc['metadata-fr']['license'] for x in ['Licence Associée', 'Licence Engagée'] ] ): + if 'license' in the_doc['metadata-fr'].keys() and not any([x in the_doc['metadata-fr']['license'] + for x in ['Licence Associée', 'Licence Engagée']]): tag_dict['isOpenAccess'] = True else: tag_dict['isOpenAccess'] = False # isRealTime? - if 'updateFrequency' in the_doc['metadata-fr'].keys() and 'continual' in the_doc['metadata-fr']['updateFrequency']: + if 'updateFrequency' in the_doc['metadata-fr'].keys() \ + and 'continual' in the_doc['metadata-fr']['updateFrequency']: tag_dict['isRealTime'] = True else: tag_dict['isRealTime'] = False # isQueryable? - tag_dict['isQueryable'] = False # default + tag_dict['isQueryable'] = False # default if 'link' in the_doc['metadata-fr'].keys(): for link in the_doc['metadata-fr']['link']: - #print(link) - if 'service' in link.keys() and any( [x in link['service'] for x in ['WFS', 'WMS', 'KML', 'WS']] ): + # print(link) + if 'service' in link.keys() and any([x in link['service'] for x in ['WFS', 'WMS', 'KML', 'WS']]): tag_dict['isQueryable'] = True break @@ -57,7 +59,7 @@ class DocIndexer: # 'isPunctual', 'isLinear', 'isAreal' will be absent instead of being 'falsely' set to false! # isSearchable? - tag_dict['isSearchable'] = False # default + tag_dict['isSearchable'] = False # default if 'data-fr' in the_doc.keys(): tag_dict['isSearchable'] = True @@ -65,19 +67,19 @@ class DocIndexer: # init tag_dict['isPunctual'] = False - tag_dict['isLinear'] = False - tag_dict['isAreal'] = False + tag_dict['isLinear'] = False + tag_dict['isAreal'] = False # isPunctual? - if any( [x in the_doc['data-fr']['geometry']['type'] for x in ['Point', 'MultiPoint']] ): + if any([x in the_doc['data-fr']['geometry']['type'] for x in ['Point', 'MultiPoint']]): tag_dict['isPunctual'] = True # isLinear? - if any( [x in the_doc['data-fr']['geometry']['type'] for x in ['LineString', 'MultiLineString']] ): + if any([x in the_doc['data-fr']['geometry']['type'] for x in ['LineString', 'MultiLineString']]): tag_dict['isLinear'] = True # isAreal? - if any( [x in the_doc['data-fr']['geometry']['type'] for x in ['Polygon', 'MultiPolygon']] ): + if any([x in the_doc['data-fr']['geometry']['type'] for x in ['Polygon', 'MultiPolygon']]): tag_dict['isAreal'] = True # isSample? docs that are tagged by this script are never just a sample @@ -105,32 +107,31 @@ class DocIndexer: cfg = decoded_body['header']['cfg'] progress_ratio = decoded_body['header']['progress_ratio'] - count = decoded_body['header']['count'] + count = decoded_body['header']['count'] es = Elasticsearch([cfg['indexer']['url']], timeout=60) # es_logger = logging.getLogger('elasticsearch') # es_logger.setLevel(logging.INFO) - if type(decoded_body['body']) is list: - #docs_to_index = decoded_body['body'] + # docs_to_index = decoded_body['body'] docs_to_index = [self.tag_doc(doc) for doc in decoded_body['body']] else: - #docs_to_index = [decoded_body['body']] + # docs_to_index = [decoded_body['body']] docs_to_index = [self.tag_doc(decoded_body['body'])] - - logging.info("[%6.2f%%] Pushing %i documents to Elasticsearch for dataset %s..." % (progress_ratio*100, len(docs_to_index), docs_to_index[0]['slug'])) + logging.info("[%6.2f%%] Pushing %i documents to Elasticsearch for dataset %s..." % (progress_ratio*100, + len(docs_to_index), + docs_to_index[0]['slug'])) es_body = '' - header = {'index': { "_index" : cfg['indexer']['index'], "_type" : "_doc" }} - #{ "index" : { "_index" : the_dest_index, "_type" : "_doc" } } + header = {'index': {"_index": cfg['indexer']['index'], "_type": "_doc"}} + # { "index" : { "_index" : the_dest_index, "_type" : "_doc" } } for doc in docs_to_index: # header['index']['_id'] = hashlib.md5( json.dumps(tmp, sort_keys=True).encode("utf-8") ).hexdigest() es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc)) - rep = es.bulk(body=es_body) # print(rep) @@ -138,17 +139,16 @@ class DocIndexer: if rep['errors'] is False: channel.basic_ack(delivery_tag=method.delivery_tag) - #print("") + # print("") logging.info("Done in %s seconds." % (t2-t1)) - if progress_ratio == 1: uuid = docs_to_index[0]['uuid'] - logging.info("Reached last page for dataset with uuid = %s. Creating reindexation task..." % (uuid)) + logging.info("Reached last page for dataset with uuid = %s. Creating reindexation task..." % uuid) exchange = cfg['rabbitmq']['exchange'] - queue_name = cfg['rabbitmq']['queue_name_4'] + queue_name = cfg['rabbitmq']['queue_name_4'] routing_key = cfg['rabbitmq']['routing_key_4'] del cfg['rabbitmq']['queue_name_4'] @@ -179,16 +179,13 @@ class DocIndexer: # body=the_body, # properties=pika.BasicProperties(delivery_mode = 2)) - - else: - channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - #print("") + channel.basic_nack(delivery_tag=method.delivery_tag, requeue=1) + # print("") logging.error(json.dumps(rep, indent=4)) logging.error("Failed") - - #time.sleep(5) + # time.sleep(5) # ---------------------- send log ---------------------------- log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], # session_id=cfg['session']['id'], @@ -213,7 +210,8 @@ class DocIndexer: # from lib.close_connection import on_timeout - # connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'], port=cfg['rabbitmq_port'])) + # connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'], + # port=cfg['rabbitmq_port'])) # # timeout = 5 # # connection.add_timeout(timeout, on_timeout(connection)) # @@ -223,7 +221,8 @@ class DocIndexer: # docs_to_index_qn = cfg['rabbitmq_queue'] # # channel.basic_qos(prefetch_count=1) - # channel.basic_consume(on_message_callback=lambda ch, method, properties, body: index_docs(ch, method, properties, body), + # channel.basic_consume(on_message_callback=lambda ch, method, + # properties, body: index_docs(ch, method, properties, body), # queue=docs_to_index_qn)#, no_ack=True) # channel.start_consuming() # @@ -232,7 +231,6 @@ class DocIndexer: if __name__ == '__main__': - import yaml import time import signal import argparse @@ -242,7 +240,8 @@ if __name__ == '__main__': parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672) parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True) parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True) - parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) + parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, + choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) args = parser.parse_args() diff --git a/workers/doc-processor.py b/workers/doc-processor.py index fda18a9568054fbf03357463f60907fb577eee27..15461bf31ce4030c1330cf642cdb66ee6ef8a4ef 100644 --- a/workers/doc-processor.py +++ b/workers/doc-processor.py @@ -82,7 +82,7 @@ class DocProcessor: return out_docs def process_docs(self, channel, method, properties, body): - + print('inside process docs') decoded_body = msgpack.unpackb(body, raw=False, object_hook=decode_datetime) @@ -100,6 +100,7 @@ class DocProcessor: # the queue this program will be writing to docs_to_index_qn = cfg['rabbitmq']['queue_name_2'] docs_to_index_rk = cfg['rabbitmq']['routing_key_2'] + print('docs_to_index_qn', docs_to_index_qn) del cfg['rabbitmq']['queue_name_2'] del cfg['rabbitmq']['routing_key_2'] @@ -114,6 +115,7 @@ class DocProcessor: if progress_rounded % 10: info_message = '[' + str(progress_rounded) + '%] Sending ' + str( len(docs)) + 'docs to RabbitMQ for dataset ' + docs[0]['slug'] + '...' + print('info_message:', info_message) # ---------------------- send log ---------------------------- log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], # session_id=cfg['session']['id'], @@ -207,11 +209,15 @@ if __name__ == '__main__': signal.signal(signal.SIGINT, exit_gracefully) parser = argparse.ArgumentParser(description='Document processor') - parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, required=True) + parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, default='localhost') + # parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, default='rabbitmq') + parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672) - parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True) - parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True) - parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) + parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, + default='download_data_grandlyon_com_index') + parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, default='doc_pages_to_process') + parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, + choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) args = parser.parse_args()