Skip to content
Snippets Groups Projects
Commit 2156e6ad authored by ddamiron's avatar ddamiron
Browse files

update process-logger

clean-up rabbit-session
clean-up mongo-session
parent db826248
Branches
No related tags found
No related merge requests found
......@@ -56,6 +56,8 @@ class MongoSession:
"loglevel": body_object["loglevel"]})
except Exception as exc:
print('[ERROR saving log]:', exc)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=1)
ch.basic_ack(delivery_tag=method.delivery_tag)
def read_mongo_log(self, this_session_id):
......
......@@ -6,10 +6,6 @@ fileDir = os.path.dirname(os.path.abspath(__file__))
parentDir = os.path.dirname(fileDir)
newPath = os.path.join(parentDir)
sys.path.append(newPath)
try:
from lib.mongo_session import MongoSession
except ImportError:
from mongo_session import MongoSession
class RabbitSession:
......@@ -38,22 +34,6 @@ class RabbitSession:
credentials=credentials)
return pika.BlockingConnection(parameters)
def consume_logs_and_save_in_mongo(self):
print('consume_logs_and_save_in_mongo')
self.channel = self.connection.channel()
# the queue this program will be consuming messages from
queue_logs_name = self.config['rabbitmq']['queue_logs_name']
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(on_message_callback=lambda ch,
method,
properties,
body: MongoSession(self.config).save_log_in_mongo(ch,
method,
body),
queue=queue_logs_name, auto_ack=True)#, no_ack=True)
self.channel.start_consuming()
def consume_queue_and_launch_specific_method(self, specific_method, specific_queue):
print('consume_queue_and_launch_specific_method')
self.channel = self.connection.channel()
......
import pika
import os, sys
import os
import sys
from elasticsearch.exceptions import AuthorizationException
fileDir = os.path.dirname(os.path.abspath(__file__))
parentDir = os.path.dirname(fileDir)
newPath = os.path.join(parentDir)
sys.path.append(newPath)
from lib.rabbit_session import RabbitSession
from pathlib import Path
from lib.mongo_session import MongoSession
try:
from types import SimpleNamespace as Namespace
except ImportError:
......@@ -20,30 +22,45 @@ newPath = os.path.join(parentDir)
sys.path.append(newPath)
class ProcessLogger:
def __init__(self, cfg):
self.cfg = cfg
def main(cfg):
# from lib.close_connection import on_timeout
credentials = pika.PlainCredentials(username=cfg['rabbitmq']['user'], password=cfg['rabbitmq']['password'])
connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'],
port=cfg['rabbitmq']['port'],
credentials=credentials))
channel = connection.channel()
# the queue this program will be consuming messages from
queue_logs_name = cfg['rabbitmq']['queue_logs_name']
def main(self):
with RabbitSession(cfg) as rabbit:
rabbit.consume_logs_and_save_in_mongo()
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
on_message_callback=lambda ch, method, properties, body: MongoSession(cfg).save_log_in_mongo(ch,
method,
body),
queue=queue_logs_name)
channel.start_consuming()
# connection.close()
if __name__ == '__main__':
import time
import signal
import argparse
parser = argparse.ArgumentParser(description='Document indexer')
parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR'])
parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str,
choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR'])
parser.add_argument('--user', dest='user', help='the RabbitMQ user login', type=str, required=True)
parser.add_argument('--password', dest='password', help='the RabbitMQ user password', type=str, required=True)
parser.add_argument('--mongouser', dest='mongouser', help='the mongo user login', type=str, required=True)
parser.add_argument('--mongopassword', dest='mongopassword', help='the mongo user password', type=str, required=True)
parser.add_argument('--mongopassword', dest='mongopassword', help='the mongo user password', type=str,
required=True)
args = parser.parse_args()
# logging.info('Starting...')
# TODO the config below should be extracted from the config.yaml file
cfg = dict()
cfg['rabbitmq'] = dict()
cfg['mongo'] = dict()
......@@ -62,16 +79,9 @@ if __name__ == '__main__':
cfg['mongo']['password'] = args.mongopassword
cfg['mongo']['collection'] = 'indexer_logs'
# logging.getLogger().setLevel(args.loglevel)
# logging.info('Starting...')
#
# signal.signal(signal.SIGINT, exit_gracefully)
while True:
try:
my_process_logger = ProcessLogger(cfg)
my_process_logger.main()
main(cfg)
except pika.exceptions.ChannelClosed:
print('Waiting for tasks...')
# logging.info("Waiting for tasks...")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment