diff --git a/lib/mongo_session.py b/lib/mongo_session.py index 2a4b32622929558311b0a0f5e0af3e5513297530..765194cbe293c24aec53435a5c93305917251734 100644 --- a/lib/mongo_session.py +++ b/lib/mongo_session.py @@ -56,6 +56,8 @@ class MongoSession: "loglevel": body_object["loglevel"]}) except Exception as exc: print('[ERROR saving log]:', exc) + ch.basic_nack(delivery_tag=method.delivery_tag, requeue=1) + ch.basic_ack(delivery_tag=method.delivery_tag) def read_mongo_log(self, this_session_id): diff --git a/lib/rabbit_session.py b/lib/rabbit_session.py index 65c8fcd47a8bb74d9cfa899952cc905c4460910d..8b9103b079a42955acf43b2968a91996b38c50d6 100644 --- a/lib/rabbit_session.py +++ b/lib/rabbit_session.py @@ -6,10 +6,6 @@ fileDir = os.path.dirname(os.path.abspath(__file__)) parentDir = os.path.dirname(fileDir) newPath = os.path.join(parentDir) sys.path.append(newPath) -try: - from lib.mongo_session import MongoSession -except ImportError: - from mongo_session import MongoSession class RabbitSession: @@ -38,22 +34,6 @@ class RabbitSession: credentials=credentials) return pika.BlockingConnection(parameters) - def consume_logs_and_save_in_mongo(self): - print('consume_logs_and_save_in_mongo') - self.channel = self.connection.channel() - # the queue this program will be consuming messages from - queue_logs_name = self.config['rabbitmq']['queue_logs_name'] - self.channel.basic_qos(prefetch_count=1) - self.channel.basic_consume(on_message_callback=lambda ch, - method, - properties, - body: MongoSession(self.config).save_log_in_mongo(ch, - method, - body), - queue=queue_logs_name, auto_ack=True)#, no_ack=True) - - self.channel.start_consuming() - def consume_queue_and_launch_specific_method(self, specific_method, specific_queue): print('consume_queue_and_launch_specific_method') self.channel = self.connection.channel() diff --git a/workers/process-logger.py b/workers/process-logger.py index 0bfe16e7a8c98aea8e0842621118f1ff9b5c13fd..5eeea5d2943c0d587cb1fb213d33d437383f2b5e 100644 --- a/workers/process-logger.py +++ b/workers/process-logger.py @@ -1,13 +1,15 @@ import pika -import os, sys +import os +import sys from elasticsearch.exceptions import AuthorizationException fileDir = os.path.dirname(os.path.abspath(__file__)) parentDir = os.path.dirname(fileDir) newPath = os.path.join(parentDir) sys.path.append(newPath) -from lib.rabbit_session import RabbitSession -from pathlib import Path +from lib.mongo_session import MongoSession + + try: from types import SimpleNamespace as Namespace except ImportError: @@ -20,30 +22,45 @@ newPath = os.path.join(parentDir) sys.path.append(newPath) -class ProcessLogger: - def __init__(self, cfg): - self.cfg = cfg +def main(cfg): + # from lib.close_connection import on_timeout + credentials = pika.PlainCredentials(username=cfg['rabbitmq']['user'], password=cfg['rabbitmq']['password']) + + connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'], + port=cfg['rabbitmq']['port'], + credentials=credentials)) + + channel = connection.channel() + + # the queue this program will be consuming messages from + queue_logs_name = cfg['rabbitmq']['queue_logs_name'] - def main(self): - with RabbitSession(cfg) as rabbit: - rabbit.consume_logs_and_save_in_mongo() + channel.basic_qos(prefetch_count=1) + channel.basic_consume( + on_message_callback=lambda ch, method, properties, body: MongoSession(cfg).save_log_in_mongo(ch, + method, + body), + + queue=queue_logs_name) + channel.start_consuming() + + # connection.close() if __name__ == '__main__': import time - import signal import argparse parser = argparse.ArgumentParser(description='Document indexer') - parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) + parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, + choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) parser.add_argument('--user', dest='user', help='the RabbitMQ user login', type=str, required=True) parser.add_argument('--password', dest='password', help='the RabbitMQ user password', type=str, required=True) parser.add_argument('--mongouser', dest='mongouser', help='the mongo user login', type=str, required=True) - parser.add_argument('--mongopassword', dest='mongopassword', help='the mongo user password', type=str, required=True) + parser.add_argument('--mongopassword', dest='mongopassword', help='the mongo user password', type=str, + required=True) args = parser.parse_args() - # logging.info('Starting...') - # TODO the config below should be extracted from the config.yaml file cfg = dict() cfg['rabbitmq'] = dict() cfg['mongo'] = dict() @@ -62,16 +79,9 @@ if __name__ == '__main__': cfg['mongo']['password'] = args.mongopassword cfg['mongo']['collection'] = 'indexer_logs' - # logging.getLogger().setLevel(args.loglevel) - # logging.info('Starting...') - # - # signal.signal(signal.SIGINT, exit_gracefully) - while True: - try: - my_process_logger = ProcessLogger(cfg) - my_process_logger.main() + main(cfg) except pika.exceptions.ChannelClosed: print('Waiting for tasks...') # logging.info("Waiting for tasks...")