diff --git a/workers/doc-enricher.py b/workers/doc-enricher.py index 18104afb4bb4c736b18db64175d4d6bb77ccd8cc..92bc14854c60fda561f4fc511e69f972087c1769 100644 --- a/workers/doc-enricher.py +++ b/workers/doc-enricher.py @@ -257,7 +257,7 @@ def enrich_docs( channel, method, properties, body ): print(" [x] json body : ", json_body) - channel.basic_publish(exchange=exchange, + channel.basic_publish(exchange=exchange_logs_name, routing_key=routing_key_logs, body=json_body, properties=pika.BasicProperties(delivery_mode=2) @@ -294,7 +294,7 @@ def enrich_docs( channel, method, properties, body ): print(" [x] json body : ", json_body) - channel.basic_publish(exchange=exchange, + channel.basic_publish(exchange=exchange_logs_name, routing_key=routing_key_logs, body=json_body, properties=pika.BasicProperties(delivery_mode=2) diff --git a/workers/doc-indexer.py b/workers/doc-indexer.py index 7a1e0668aa58c53ab26e64870876686635e33181..4e302a7c4f116bb3f6ccbc1faea821b6b4172c34 100644 --- a/workers/doc-indexer.py +++ b/workers/doc-indexer.py @@ -140,7 +140,7 @@ def index_docs(channel, method, properties, body): print(" [x] json body : ", json_body) - channel.basic_publish(exchange=exchange, + channel.basic_publish(exchange=exchange_logs_name, routing_key=routing_key_logs, body=json_body, properties=pika.BasicProperties(delivery_mode=2) @@ -203,7 +203,7 @@ def index_docs(channel, method, properties, body): # session_id=cfg['session']['id'], uuid=cfg['session']['current_uuid'], step='doc-indexer', - status='terminated', + status='done', uuid_prefix='meta', info='no info' ) @@ -212,7 +212,7 @@ def index_docs(channel, method, properties, body): print(" [x] json body : ", json_body) - channel.basic_publish(exchange=exchange, + channel.basic_publish(exchange=exchange_logs_name, routing_key=routing_key_logs, body=json_body, properties=pika.BasicProperties(delivery_mode=2) diff --git a/workers/doc-processor-oo.py b/workers/doc-processor-oo.py new file mode 100644 index 0000000000000000000000000000000000000000..8c8b27a3301becf36f853a2df809061ab8b139db --- /dev/null +++ b/workers/doc-processor-oo.py @@ -0,0 +1,254 @@ +import pika +import msgpack +import datetime +import re +import json +import os, sys + +fileDir = os.path.dirname(os.path.abspath(__file__)) +parentDir = os.path.dirname(fileDir) +newPath = os.path.join(parentDir) +sys.path.append(newPath) +from lib.flatten_utils import flatten_json, unflatten_json +from lib.type_utils import convert_to_datetime, convert_to_float, convert_to_int, convert_to_str, convert_to_boolean +from lib.exit_gracefully import exit_gracefully +from lib.my_logging import logging +from lib.serializers import decode_datetime + +from lib.rabbit_session import RabbitSession +from lib.log_message import LogMessage + + +class DocProcessor: + + def __init__(self, cfg): + self.cfg = cfg + self.rabbit = None + + def fix_field_types(self, in_docs, out_types ): + + out_docs = [] + + for in_doc in in_docs: + + # metadata = in_doc['metadata-fr'] + # data = in_doc['data-fr'] + + if 'data-fr' not in in_doc: + out_docs.append(in_doc) + continue + # + in_flattened_properties = flatten_json(in_doc['data-fr']['properties']) + + out_flattened_properties = in_flattened_properties.copy() + + for prop in in_flattened_properties.keys(): + + # remove null values + the_value = in_flattened_properties[prop] + if the_value is None or (type(the_value) is str and (the_value.strip() == '' or the_value.strip() == '-' or the_value.strip() == 'None')): + del out_flattened_properties[prop] + continue + + # LOOKUP, ex.: the type of a field named "thefield.12.thesubfield" can be found in the catalog by looking for "thefield.0.thesubfield" + lookup_key = re.sub(r'\.\d+', '.0', prop) + + if out_types[lookup_key] == 'str': + out_flattened_properties[prop] = convert_to_str(in_flattened_properties[prop]) + elif out_types[lookup_key] == 'int': + out_flattened_properties[prop] = convert_to_int(in_flattened_properties[prop]) + elif out_types[lookup_key] == 'float': + out_flattened_properties[prop] = convert_to_float(in_flattened_properties[prop]) + elif out_types[lookup_key] in ['date', 'datetime']: + out_flattened_properties[prop] = convert_to_datetime(in_flattened_properties[prop]).strftime('%Y-%m-%dT%H:%M:%SZ') + elif out_types[lookup_key] == 'bool': + out_flattened_properties[prop] = convert_to_boolean(in_flattened_properties[prop]) + else: + logging.critical('type %s not supported', out_types[prop]) + sys.exit(1) + + # pprint + out_doc = in_doc.copy() + out_doc['data-fr']['properties'] = unflatten_json(out_flattened_properties) + + # amending addresses which are not in the schema.org format; we use out_doc in order to avoid reintroducing null values! + if 'address' in out_doc['data-fr']['properties'].keys() and type(out_doc['data-fr']['properties']['address']) is str: + the_street_address = in_doc['data-fr']['properties']['address'] + out_doc['data-fr']['properties']['address'] = {'streetAddress': the_street_address} + logging.debug(out_doc['data-fr']['properties']['address']) + + out_docs.append(out_doc) + + return out_docs + + def process_docs(self, channel, method, properties, body): + print('inside process docs') + decoded_body = msgpack.unpackb(body, raw=False, object_hook=decode_datetime) + + + cfg = decoded_body['header']['cfg'] + progress_ratio = decoded_body['header']['progress_ratio'] + count = decoded_body['header']['count'] + docs = decoded_body['body'] + + filename = os.path.join( parentDir, cfg['session']['working_directory'], 'field_types.json' ) + + with open(filename, 'r') as fp: + field_types = json.load(fp) + + exchange = cfg['rabbitmq']['exchange'] + # the queue this program will be writing to + docs_to_index_qn = cfg['rabbitmq']['queue_name_2'] + docs_to_index_rk = cfg['rabbitmq']['routing_key_2'] + print('docs_to_index_qn', docs_to_index_qn) + + del cfg['rabbitmq']['queue_name_2'] + del cfg['rabbitmq']['routing_key_2'] + + # channel.exchange_declare(exchange=exchange, exchange_type='direct') + # channel.queue_declare(queue=docs_to_index_qn, durable=True) + # channel.queue_bind(exchange=exchange, queue=docs_to_index_qn, routing_key=docs_to_index_rk) + + logging.info('[%6.2f%%] Processing %i docs for dataset %s...' % (progress_ratio*100, len(docs), docs[0]['slug'])) + + progress_rounded = round(progress_ratio * 100) + if progress_rounded % 10: + info_message = '[' + str(progress_rounded) + '%] Sending ' + str( + len(docs)) + 'docs to RabbitMQ for dataset ' + docs[0]['slug'] + '...' + print('info_message:', info_message) + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], + # session_id=cfg['session']['id'], + uuid=decoded_body['header']['cfg']['session']['current_uuid'], + step='doc-processor', + status=decoded_body['body']['metadata-fr']['title'], + uuid_prefix='meta', + info=info_message + ) + self.rabbit.publish_log(log_message=log_message.__dict__) + # ------------------------------------------------------------------------ + + docs_to_index = self.fix_field_types( docs, field_types ) + + msg = dict() + msg['header'] = dict() + msg['header']['cfg'] = cfg + msg['header']['progress_ratio'] = progress_ratio + msg['header']['count'] = count + msg['body'] = docs_to_index + + the_task_body = msgpack.packb(msg, use_bin_type=True) + # ------------------------send task----------------------------------- + self.rabbit.publish_task(the_body=the_task_body, + exchange=exchange, + routing_key=docs_to_index_rk, + queue_name=docs_to_index_qn) + # --------------------------------------------------------------------- + + + # channel.basic_publish( exchange=exchange, + # routing_key=docs_to_index_rk, + # body=the_body, + # properties=pika.BasicProperties(delivery_mode = 2) + # ) + + logging.info('...done!') + # channel.basic_ack(delivery_tag = method.delivery_tag) + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], + # session_id=cfg['session']['id'], + uuid=decoded_body['header']['cfg']['session']['current_uuid'], + step='doc-processor', + status='Terminated', + uuid_prefix='meta', + info='no info' + ) + self.rabbit.publish_log(log_message=log_message.__dict__) + # ------------------------------------------------------------ + return + + def main(self): + with RabbitSession(self.cfg) as self.rabbit: + # ------------------------------------------------------------ + docs_to_enrich_qn = cfg['rabbitmq_queue'] + self.rabbit.consume_queue_and_launch_specific_method(specific_method=self.process_docs, + specific_queue=docs_to_enrich_qn) + + + # # from lib.close_connection import on_timeout + # + # connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'],port=cfg['rabbitmq_port'])) + # # timeout = 5 + # # connection.add_timeout(timeout, on_timeout(connection)) + # + # channel = connection.channel() + # # exchange = cfg['rabbitmq_exchange'] + # + # # the queue this program will be consuming messages from + # doc_pages_to_process_qn = cfg['rabbitmq_queue'] + # + # channel.basic_qos(prefetch_count=1) + # channel.basic_consume(on_message_callback=lambda ch, method, properties, body: self.process_docs(ch, method, properties, body), + # # field_types=field_types, + # # exchange=exchange), + # # docs_to_index_qn=docs_to_index_qn, + # # docs_to_index_rk=docs_to_index_rk), + # queue=doc_pages_to_process_qn)# , no_ack=True) + # channel.start_consuming() + # + # connection.close() + + +if __name__ == '__main__': + + import time + import signal + import yaml + import argparse + + signal.signal(signal.SIGINT, exit_gracefully) + + parser = argparse.ArgumentParser(description='Document processor') + # parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, default='localhost') + parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, default='rabbitmq') + + parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672) + parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, + default='download_data_grandlyon_com_index') + parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, default='doc_pages_to_process') + parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, + choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) + + args = parser.parse_args() + + cfg = dict() + cfg['rabbitmq'] = dict() + cfg['rabbitmq_host'] = args.host + cfg['rabbitmq_port'] = args.port + cfg['rabbitmq_exchange'] = args.exchange + cfg['rabbitmq_queue'] = args.queue + cfg['rabbitmq']['user'] = 'admin' + cfg['rabbitmq']['password'] = 'admin' + cfg['rabbitmq']['queue_logs_name'] = 'session_logs' + cfg['rabbitmq']['routing_key_logs'] = 'scripts_log_key' + cfg['rabbitmq']['exchange_logs_name'] = 'download_data_grandlyon_com_logs' + + logging.getLogger().setLevel(args.loglevel) + logging.info('Starting...') + + while True: + try: + DocProcessor(cfg).main() + except FileNotFoundError as e: + logging.error('The field types file is not ready yet. Retrying in 5 seconds...') + time.sleep(5) + except pika.exceptions.ChannelClosed: + logging.info("Waiting for tasks...") + time.sleep(5) + except pika.exceptions.AMQPConnectionError: + logging.info('Waiting for RabbitMQ to be reachable...') + time.sleep(5) + except Exception as e: + logging.error(e) + time.sleep(5) + exit(1) diff --git a/workers/doc-processor.py b/workers/doc-processor.py index 8c8b27a3301becf36f853a2df809061ab8b139db..ba1e9ddae5841f638d885e3c3322d1173fff1c10 100644 --- a/workers/doc-processor.py +++ b/workers/doc-processor.py @@ -14,189 +14,197 @@ from lib.type_utils import convert_to_datetime, convert_to_float, convert_to_int from lib.exit_gracefully import exit_gracefully from lib.my_logging import logging from lib.serializers import decode_datetime - -from lib.rabbit_session import RabbitSession from lib.log_message import LogMessage -class DocProcessor: - def __init__(self, cfg): - self.cfg = cfg - self.rabbit = None +def fix_field_types( in_docs, out_types ): + + out_docs = [] - def fix_field_types(self, in_docs, out_types ): + for in_doc in in_docs: - out_docs = [] + # metadata = in_doc['metadata-fr'] + # data = in_doc['data-fr'] + + if 'data-fr' not in in_doc: + out_docs.append(in_doc) + continue + # + in_flattened_properties = flatten_json(in_doc['data-fr']['properties']) - for in_doc in in_docs: + out_flattened_properties = in_flattened_properties.copy() - # metadata = in_doc['metadata-fr'] - # data = in_doc['data-fr'] + for prop in in_flattened_properties.keys(): - if 'data-fr' not in in_doc: - out_docs.append(in_doc) + # remove null values + the_value = in_flattened_properties[prop] + if the_value is None or (type(the_value) is str and (the_value.strip() == '' or the_value.strip() == '-' or the_value.strip() == 'None')): + del out_flattened_properties[prop] continue - # - in_flattened_properties = flatten_json(in_doc['data-fr']['properties']) - - out_flattened_properties = in_flattened_properties.copy() - - for prop in in_flattened_properties.keys(): - - # remove null values - the_value = in_flattened_properties[prop] - if the_value is None or (type(the_value) is str and (the_value.strip() == '' or the_value.strip() == '-' or the_value.strip() == 'None')): - del out_flattened_properties[prop] - continue - - # LOOKUP, ex.: the type of a field named "thefield.12.thesubfield" can be found in the catalog by looking for "thefield.0.thesubfield" - lookup_key = re.sub(r'\.\d+', '.0', prop) - - if out_types[lookup_key] == 'str': - out_flattened_properties[prop] = convert_to_str(in_flattened_properties[prop]) - elif out_types[lookup_key] == 'int': - out_flattened_properties[prop] = convert_to_int(in_flattened_properties[prop]) - elif out_types[lookup_key] == 'float': - out_flattened_properties[prop] = convert_to_float(in_flattened_properties[prop]) - elif out_types[lookup_key] in ['date', 'datetime']: - out_flattened_properties[prop] = convert_to_datetime(in_flattened_properties[prop]).strftime('%Y-%m-%dT%H:%M:%SZ') - elif out_types[lookup_key] == 'bool': - out_flattened_properties[prop] = convert_to_boolean(in_flattened_properties[prop]) - else: - logging.critical('type %s not supported', out_types[prop]) - sys.exit(1) - - # pprint - out_doc = in_doc.copy() - out_doc['data-fr']['properties'] = unflatten_json(out_flattened_properties) - - # amending addresses which are not in the schema.org format; we use out_doc in order to avoid reintroducing null values! - if 'address' in out_doc['data-fr']['properties'].keys() and type(out_doc['data-fr']['properties']['address']) is str: - the_street_address = in_doc['data-fr']['properties']['address'] - out_doc['data-fr']['properties']['address'] = {'streetAddress': the_street_address} - logging.debug(out_doc['data-fr']['properties']['address']) - - out_docs.append(out_doc) - - return out_docs - - def process_docs(self, channel, method, properties, body): - print('inside process docs') - decoded_body = msgpack.unpackb(body, raw=False, object_hook=decode_datetime) - - - cfg = decoded_body['header']['cfg'] - progress_ratio = decoded_body['header']['progress_ratio'] - count = decoded_body['header']['count'] - docs = decoded_body['body'] - - filename = os.path.join( parentDir, cfg['session']['working_directory'], 'field_types.json' ) - - with open(filename, 'r') as fp: - field_types = json.load(fp) - - exchange = cfg['rabbitmq']['exchange'] - # the queue this program will be writing to - docs_to_index_qn = cfg['rabbitmq']['queue_name_2'] - docs_to_index_rk = cfg['rabbitmq']['routing_key_2'] - print('docs_to_index_qn', docs_to_index_qn) - - del cfg['rabbitmq']['queue_name_2'] - del cfg['rabbitmq']['routing_key_2'] - - # channel.exchange_declare(exchange=exchange, exchange_type='direct') - # channel.queue_declare(queue=docs_to_index_qn, durable=True) - # channel.queue_bind(exchange=exchange, queue=docs_to_index_qn, routing_key=docs_to_index_rk) - - logging.info('[%6.2f%%] Processing %i docs for dataset %s...' % (progress_ratio*100, len(docs), docs[0]['slug'])) - - progress_rounded = round(progress_ratio * 100) - if progress_rounded % 10: - info_message = '[' + str(progress_rounded) + '%] Sending ' + str( - len(docs)) + 'docs to RabbitMQ for dataset ' + docs[0]['slug'] + '...' - print('info_message:', info_message) - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], - # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], - step='doc-processor', - status=decoded_body['body']['metadata-fr']['title'], - uuid_prefix='meta', - info=info_message - ) - self.rabbit.publish_log(log_message=log_message.__dict__) - # ------------------------------------------------------------------------ - - docs_to_index = self.fix_field_types( docs, field_types ) - - msg = dict() - msg['header'] = dict() - msg['header']['cfg'] = cfg - msg['header']['progress_ratio'] = progress_ratio - msg['header']['count'] = count - msg['body'] = docs_to_index - - the_task_body = msgpack.packb(msg, use_bin_type=True) - # ------------------------send task----------------------------------- - self.rabbit.publish_task(the_body=the_task_body, - exchange=exchange, - routing_key=docs_to_index_rk, - queue_name=docs_to_index_qn) - # --------------------------------------------------------------------- - - - # channel.basic_publish( exchange=exchange, - # routing_key=docs_to_index_rk, - # body=the_body, - # properties=pika.BasicProperties(delivery_mode = 2) - # ) - - logging.info('...done!') - # channel.basic_ack(delivery_tag = method.delivery_tag) + + # LOOKUP, ex.: the type of a field named "thefield.12.thesubfield" can be found in the catalog by looking for "thefield.0.thesubfield" + lookup_key = re.sub(r'\.\d+', '.0', prop) + + if out_types[lookup_key] == 'str': + out_flattened_properties[prop] = convert_to_str(in_flattened_properties[prop]) + elif out_types[lookup_key] == 'int': + out_flattened_properties[prop] = convert_to_int(in_flattened_properties[prop]) + elif out_types[lookup_key] == 'float': + out_flattened_properties[prop] = convert_to_float(in_flattened_properties[prop]) + elif out_types[lookup_key] in ['date', 'datetime']: + out_flattened_properties[prop] = convert_to_datetime(in_flattened_properties[prop]).strftime('%Y-%m-%dT%H:%M:%SZ') + elif out_types[lookup_key] == 'bool': + out_flattened_properties[prop] = convert_to_boolean(in_flattened_properties[prop]) + else: + logging.critical('type %s not supported', out_types[prop]) + sys.exit(1) + + # pprint + out_doc = in_doc.copy() + out_doc['data-fr']['properties'] = unflatten_json(out_flattened_properties) + + # amending addresses which are not in the schema.org format; we use out_doc in order to avoid reintroducing null values! + if 'address' in out_doc['data-fr']['properties'].keys() and type(out_doc['data-fr']['properties']['address']) is str: + the_street_address = in_doc['data-fr']['properties']['address'] + out_doc['data-fr']['properties']['address'] = {'streetAddress': the_street_address} + logging.debug(out_doc['data-fr']['properties']['address']) + + out_docs.append(out_doc) + + + return out_docs + + +def process_docs( channel, method, properties, body ): + + + decoded_body = msgpack.unpackb(body, raw=False, object_hook=decode_datetime) + + cfg = decoded_body['header']['cfg'] + progress_ratio = decoded_body['header']['progress_ratio'] + count = decoded_body['header']['count'] + docs = decoded_body['body'] + + filename = os.path.join( parentDir, cfg['session']['working_directory'], 'field_types.json' ) + + with open(filename, 'r') as fp: + field_types = json.load(fp) + + exchange = cfg['rabbitmq']['exchange'] + # the queue this program will be writing to + docs_to_index_qn = cfg['rabbitmq']['queue_name_2'] + docs_to_index_rk = cfg['rabbitmq']['routing_key_2'] + queue_logs_name = cfg['rabbitmq']['queue_logs_name'] + routing_key_logs = cfg['rabbitmq']['routing_key_logs'] + exchange_logs_name = cfg['rabbitmq']['exchange_logs_name'] + + del cfg['rabbitmq']['queue_name_2'] + del cfg['rabbitmq']['routing_key_2'] + + channel.exchange_declare(exchange=exchange, exchange_type='direct') + channel.queue_declare(queue=docs_to_index_qn, durable=True) + channel.queue_bind(exchange=exchange, queue=docs_to_index_qn, routing_key=docs_to_index_rk) + + channel.queue_declare(queue=queue_logs_name, durable=True) + channel.queue_bind(exchange=exchange_logs_name, queue=queue_logs_name, routing_key=routing_key_logs) + + logging.info('[%6.2f%%] Processing %i docs for dataset %s...' % (progress_ratio*100, len(docs), docs[0]['slug'])) + + progress_rounded = round(progress_ratio * 100) + if progress_rounded % 10: + status_message = '[' + str(progress_rounded) + '%] Sending ' + str( + len(docs)) + 'docs to RabbitMQ for dataset ' + str(docs[0]['slug']) + '...' # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], + log_message = LogMessage(session_id=cfg['session']['id'], # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], + uuid=cfg['session']['current_uuid'], step='doc-processor', - status='Terminated', + status=status_message, uuid_prefix='meta', info='no info' ) - self.rabbit.publish_log(log_message=log_message.__dict__) + + json_body = json.dumps(log_message) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=exchange_logs_name, + routing_key=routing_key_logs, + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) # ------------------------------------------------------------ - return - def main(self): - with RabbitSession(self.cfg) as self.rabbit: - # ------------------------------------------------------------ - docs_to_enrich_qn = cfg['rabbitmq_queue'] - self.rabbit.consume_queue_and_launch_specific_method(specific_method=self.process_docs, - specific_queue=docs_to_enrich_qn) + docs_to_index = fix_field_types( docs, field_types ) + msg = dict() + msg['header'] = dict() + msg['header']['cfg'] = cfg + msg['header']['progress_ratio'] = progress_ratio + msg['header']['count'] = count + msg['body'] = docs_to_index - # # from lib.close_connection import on_timeout - # - # connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'],port=cfg['rabbitmq_port'])) - # # timeout = 5 - # # connection.add_timeout(timeout, on_timeout(connection)) - # - # channel = connection.channel() - # # exchange = cfg['rabbitmq_exchange'] - # - # # the queue this program will be consuming messages from - # doc_pages_to_process_qn = cfg['rabbitmq_queue'] - # - # channel.basic_qos(prefetch_count=1) - # channel.basic_consume(on_message_callback=lambda ch, method, properties, body: self.process_docs(ch, method, properties, body), - # # field_types=field_types, - # # exchange=exchange), - # # docs_to_index_qn=docs_to_index_qn, - # # docs_to_index_rk=docs_to_index_rk), - # queue=doc_pages_to_process_qn)# , no_ack=True) - # channel.start_consuming() - # - # connection.close() + the_body = msgpack.packb(msg, use_bin_type=True) + + + + channel.basic_publish( exchange=exchange, + routing_key=docs_to_index_rk, + body=the_body, + properties=pika.BasicProperties(delivery_mode = 2) + ) + + logging.info('...done!') + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='doc-processor', + status='done', + uuid_prefix='meta', + info='no info' + ) + + json_body = json.dumps(log_message) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=exchange_logs_name, + routing_key=routing_key_logs, + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ + channel.basic_ack(delivery_tag = method.delivery_tag) + + return + + +def main(cfg): + + #from lib.close_connection import on_timeout + + connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'],port=cfg['rabbitmq_port'])) + #timeout = 5 + #connection.add_timeout(timeout, on_timeout(connection)) + + channel = connection.channel() + #exchange = cfg['rabbitmq_exchange'] + + # the queue this program will be consuming messages from + doc_pages_to_process_qn = cfg['rabbitmq_queue'] + + channel.basic_qos(prefetch_count=1) + channel.basic_consume(on_message_callback=lambda ch, method, properties, body: process_docs(ch, method, properties, body), + #field_types=field_types, + # exchange=exchange), + #docs_to_index_qn=docs_to_index_qn, + #docs_to_index_rk=docs_to_index_rk), + queue=doc_pages_to_process_qn)#, no_ack=True) + channel.start_consuming() + + connection.close() if __name__ == '__main__': @@ -209,36 +217,31 @@ if __name__ == '__main__': signal.signal(signal.SIGINT, exit_gracefully) parser = argparse.ArgumentParser(description='Document processor') - # parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, default='localhost') - parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, default='rabbitmq') - + parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, required=True) parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672) - parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, - default='download_data_grandlyon_com_index') - parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, default='doc_pages_to_process') + parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True) + parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True) parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) + parser.add_argument('--user', dest='user', help='the RabbitMQ user login', type=str, required=True) + parser.add_argument('--password', dest='password', help='the RabbitMQ user password', type=str, required=True) args = parser.parse_args() cfg = dict() - cfg['rabbitmq'] = dict() cfg['rabbitmq_host'] = args.host cfg['rabbitmq_port'] = args.port cfg['rabbitmq_exchange'] = args.exchange cfg['rabbitmq_queue'] = args.queue - cfg['rabbitmq']['user'] = 'admin' - cfg['rabbitmq']['password'] = 'admin' - cfg['rabbitmq']['queue_logs_name'] = 'session_logs' - cfg['rabbitmq']['routing_key_logs'] = 'scripts_log_key' - cfg['rabbitmq']['exchange_logs_name'] = 'download_data_grandlyon_com_logs' + cfg['rabbitmq']['user'] = args.user + cfg['rabbitmq']['password'] = args.password logging.getLogger().setLevel(args.loglevel) logging.info('Starting...') while True: try: - DocProcessor(cfg).main() + main(cfg) except FileNotFoundError as e: logging.error('The field types file is not ready yet. Retrying in 5 seconds...') time.sleep(5)