From e3a1083fc471c4863dc2429637c51c413279862a Mon Sep 17 00:00:00 2001 From: ddamiron <ddamiron@sii.fr> Date: Tue, 2 Jul 2019 10:03:43 +0200 Subject: [PATCH] add loglevel in mongo log message --- api.py | 2 +- workers/doc-enricher.py | 29 ++++++++++++++++------------- workers/doc-indexer.py | 7 ++++--- workers/doc-processor.py | 5 +++-- workers/reindexer.py | 3 ++- workers/sample-generator.py | 2 +- 6 files changed, 27 insertions(+), 21 deletions(-) diff --git a/api.py b/api.py index a02270a..b0b57d6 100644 --- a/api.py +++ b/api.py @@ -88,7 +88,7 @@ def _main(the_uuid): executor.submit(main, cfg) - return jsonify({'id': this_session_id}), 200 + return jsonify({'session_id': this_session_id}), 200 # try: # main(cfg) diff --git a/workers/doc-enricher.py b/workers/doc-enricher.py index 9ec2d39..8af1baf 100644 --- a/workers/doc-enricher.py +++ b/workers/doc-enricher.py @@ -152,7 +152,7 @@ def old_enrich_docs( channel, method, properties, body, **kwargs ): routing_key=kwargs['docs_to_enrich_rk'], body=the_body, properties=pika.BasicProperties(delivery_mode = 2) - ) + ) if feature_page != None: @@ -171,14 +171,14 @@ def old_enrich_docs( channel, method, properties, body, **kwargs ): routing_key=kwargs['doc_pages_to_store_in_mongo_rk'], body=the_body, properties=pika.BasicProperties(delivery_mode = 2) - ) + ) channel.basic_publish( exchange=kwargs['exchange'], routing_key=kwargs['doc_pages_to_process_rk'], body=the_body, properties=pika.BasicProperties(delivery_mode = 2) - ) + ) logging.info('...done!') @@ -240,8 +240,11 @@ def enrich_docs( channel, method, properties, body ): logging.info('[%6.2f%%] Sending %i docs to RabbitMQ for dataset %s...' % (progress_ratio*100, len(doc_page), doc_page[0]['slug'])) progress_rounded = round(progress_ratio * 100) - status_message = '[' + str(progress_ratio*100) + '%] Sending ' + str( - len(doc_page)) + 'docs to RabbitMQ for dataset ' + str(doc_page[0]['slug']) + '...' + # status_message = '[' + str(progress_ratio*100) + '%] Sending ' + str( + # len(doc_page)) + 'docs to RabbitMQ for dataset ' + str(doc_page[0]['slug']) + '...' + status_message = '[{:.2f} %] Sending {:d} docs to RabbitMQ for dataset {:s}...'.format(progress_ratio*100, + len(doc_page), + doc_page[0]['slug']) # ---------------------- send log ---------------------------- log_message = LogMessage(session_id=cfg['session']['id'], # session_id=cfg['session']['id'], @@ -277,7 +280,7 @@ def enrich_docs( channel, method, properties, body ): routing_key=doc_pages_to_process_rk, body=the_body, properties=pika.BasicProperties(delivery_mode = 2) - ) + ) #logging.info('...done!') if progress_rounded == 100: @@ -334,13 +337,13 @@ def main(cfg): channel.basic_qos(prefetch_count=1) channel.basic_consume(on_message_callback=lambda ch, method, properties, body: - enrich_docs(ch, method, properties, body), - #doc_pages_to_store_in_mongo_rk=doc_pages_to_store_in_mongo_rk, - #docs_to_enrich_rk=docs_to_enrich_rk, - #doc_pages_to_process_rk=doc_pages_to_process_rk, - #features_per_page=cfg['wfs']['features_per_page'], - #postgis_cfg=cfg['postgis']), - queue=docs_to_enrich_qn)#, no_ack=True) + enrich_docs(ch, method, properties, body), + #doc_pages_to_store_in_mongo_rk=doc_pages_to_store_in_mongo_rk, + #docs_to_enrich_rk=docs_to_enrich_rk, + #doc_pages_to_process_rk=doc_pages_to_process_rk, + #features_per_page=cfg['wfs']['features_per_page'], + #postgis_cfg=cfg['postgis']), + queue=docs_to_enrich_qn)#, no_ack=True) channel.start_consuming() connection.close() diff --git a/workers/doc-indexer.py b/workers/doc-indexer.py index 6e9e5e7..4c9189c 100644 --- a/workers/doc-indexer.py +++ b/workers/doc-indexer.py @@ -123,8 +123,9 @@ def index_docs(channel, method, properties, body): logging.info("[%6.2f%%] Pushing %i documents to Elasticsearch for dataset %s..." % (progress_ratio*100, len(docs_to_index), docs_to_index[0]['slug'])) progress_rounded = round(progress_ratio * 100) - status_message = '[' + str(progress_ratio*100) + '%] Sending ' + str( - len(docs_to_index)) + ' docs to RabbitMQ for dataset ' + str(docs_to_index[0]['slug']) + '...' + status_message = '[{:.2f} %] Sending {:d} docs to RabbitMQ for dataset {:s}...'.format(progress_ratio * 100, + len(docs_to_index), + docs_to_index[0]['slug']) # ---------------------- send log ---------------------------- log_message = LogMessage(session_id=cfg['session']['id'], # session_id=cfg['session']['id'], @@ -264,7 +265,7 @@ if __name__ == '__main__': parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True) parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True) parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, - argu=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) + choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) parser.add_argument('--user', dest='user', help='the RabbitMQ user login', type=str, required=True) parser.add_argument('--password', dest='password', help='the RabbitMQ user password', type=str, required=True) diff --git a/workers/doc-processor.py b/workers/doc-processor.py index b121f9e..5ce3246 100644 --- a/workers/doc-processor.py +++ b/workers/doc-processor.py @@ -112,8 +112,9 @@ def process_docs( channel, method, properties, body ): logging.info('[%6.2f%%] Processing %i docs for dataset %s...' % (progress_ratio*100, len(docs), docs[0]['slug'])) progress_rounded = round(progress_ratio * 100) - status_message = '[' + str(progress_ratio * 100) + '%] Sending ' + str( - len(docs)) + ' docs to RabbitMQ for dataset ' + str(docs[0]['slug']) + '...' + status_message = '[{:.2f} %] Sending {:d} docs to RabbitMQ for dataset {:s}...'.format(progress_ratio * 100, + len(docs), + docs[0]['slug']) # ---------------------- send log ---------------------------- log_message = LogMessage(session_id=cfg['session']['id'], # session_id=cfg['session']['id'], diff --git a/workers/reindexer.py b/workers/reindexer.py index f695990..3dbf746 100644 --- a/workers/reindexer.py +++ b/workers/reindexer.py @@ -150,7 +150,8 @@ def on_msg_callback(channel, method, properties, body): logging.info("Removing dataset with uuid = %s from the destination index..." % uuid) # ---------------------- send log ---------------------------- - message = 'Removing dataset with uuid: ' + str(uuid) + ' from the destination index...' + # message = 'Removing dataset with uuid: ' + str(uuid) + ' from the destination index...' + message = 'Removing dataset with uuid: {:s} from the destination index...'.format(uuid) log_message = LogMessage(session_id=cfg['session']['id'], # session_id=cfg['session']['id'], diff --git a/workers/sample-generator.py b/workers/sample-generator.py index 9e5f2a8..d1089e7 100644 --- a/workers/sample-generator.py +++ b/workers/sample-generator.py @@ -182,7 +182,7 @@ def callback(channel, method, properties, body): logging.info("Deleting already existing samples for dataset with slug = %s" % docs_to_index[0]['slug']) # ---------------------- send log ---------------------------- - message = "Deleting already existing samples for dataset with slug = " + str(docs_to_index[0]['slug']) + message = "Deleting already existing samples for dataset with slug = {:s} ".format(docs_to_index[0]['slug']) log_message = LogMessage(session_id=cfg['session']['id'], uuid=cfg['session']['current_uuid'], step='sampler', -- GitLab