diff --git a/api.py b/api.py index a02270af13a195edebbd03951f53a82be904aff6..b0b57d6a480e97c52959fe5ea6e6350caed7a45f 100644 --- a/api.py +++ b/api.py @@ -88,7 +88,7 @@ def _main(the_uuid): executor.submit(main, cfg) - return jsonify({'id': this_session_id}), 200 + return jsonify({'session_id': this_session_id}), 200 # try: # main(cfg) diff --git a/workers/doc-enricher.py b/workers/doc-enricher.py index 9ec2d39c83bbe64427a37e6dbd97b046aa9203d9..8af1bafbe43c929a2c437596f2381ba2c765326d 100644 --- a/workers/doc-enricher.py +++ b/workers/doc-enricher.py @@ -152,7 +152,7 @@ def old_enrich_docs( channel, method, properties, body, **kwargs ): routing_key=kwargs['docs_to_enrich_rk'], body=the_body, properties=pika.BasicProperties(delivery_mode = 2) - ) + ) if feature_page != None: @@ -171,14 +171,14 @@ def old_enrich_docs( channel, method, properties, body, **kwargs ): routing_key=kwargs['doc_pages_to_store_in_mongo_rk'], body=the_body, properties=pika.BasicProperties(delivery_mode = 2) - ) + ) channel.basic_publish( exchange=kwargs['exchange'], routing_key=kwargs['doc_pages_to_process_rk'], body=the_body, properties=pika.BasicProperties(delivery_mode = 2) - ) + ) logging.info('...done!') @@ -240,8 +240,11 @@ def enrich_docs( channel, method, properties, body ): logging.info('[%6.2f%%] Sending %i docs to RabbitMQ for dataset %s...' % (progress_ratio*100, len(doc_page), doc_page[0]['slug'])) progress_rounded = round(progress_ratio * 100) - status_message = '[' + str(progress_ratio*100) + '%] Sending ' + str( - len(doc_page)) + 'docs to RabbitMQ for dataset ' + str(doc_page[0]['slug']) + '...' + # status_message = '[' + str(progress_ratio*100) + '%] Sending ' + str( + # len(doc_page)) + 'docs to RabbitMQ for dataset ' + str(doc_page[0]['slug']) + '...' + status_message = '[{:.2f} %] Sending {:d} docs to RabbitMQ for dataset {:s}...'.format(progress_ratio*100, + len(doc_page), + doc_page[0]['slug']) # ---------------------- send log ---------------------------- log_message = LogMessage(session_id=cfg['session']['id'], # session_id=cfg['session']['id'], @@ -277,7 +280,7 @@ def enrich_docs( channel, method, properties, body ): routing_key=doc_pages_to_process_rk, body=the_body, properties=pika.BasicProperties(delivery_mode = 2) - ) + ) #logging.info('...done!') if progress_rounded == 100: @@ -334,13 +337,13 @@ def main(cfg): channel.basic_qos(prefetch_count=1) channel.basic_consume(on_message_callback=lambda ch, method, properties, body: - enrich_docs(ch, method, properties, body), - #doc_pages_to_store_in_mongo_rk=doc_pages_to_store_in_mongo_rk, - #docs_to_enrich_rk=docs_to_enrich_rk, - #doc_pages_to_process_rk=doc_pages_to_process_rk, - #features_per_page=cfg['wfs']['features_per_page'], - #postgis_cfg=cfg['postgis']), - queue=docs_to_enrich_qn)#, no_ack=True) + enrich_docs(ch, method, properties, body), + #doc_pages_to_store_in_mongo_rk=doc_pages_to_store_in_mongo_rk, + #docs_to_enrich_rk=docs_to_enrich_rk, + #doc_pages_to_process_rk=doc_pages_to_process_rk, + #features_per_page=cfg['wfs']['features_per_page'], + #postgis_cfg=cfg['postgis']), + queue=docs_to_enrich_qn)#, no_ack=True) channel.start_consuming() connection.close() diff --git a/workers/doc-indexer.py b/workers/doc-indexer.py index 6e9e5e78346cd2940c6dc172ff0f9f7670777e7e..4c9189c782e9c98ee5bfdf13082ad4c0a962466a 100644 --- a/workers/doc-indexer.py +++ b/workers/doc-indexer.py @@ -123,8 +123,9 @@ def index_docs(channel, method, properties, body): logging.info("[%6.2f%%] Pushing %i documents to Elasticsearch for dataset %s..." % (progress_ratio*100, len(docs_to_index), docs_to_index[0]['slug'])) progress_rounded = round(progress_ratio * 100) - status_message = '[' + str(progress_ratio*100) + '%] Sending ' + str( - len(docs_to_index)) + ' docs to RabbitMQ for dataset ' + str(docs_to_index[0]['slug']) + '...' + status_message = '[{:.2f} %] Sending {:d} docs to RabbitMQ for dataset {:s}...'.format(progress_ratio * 100, + len(docs_to_index), + docs_to_index[0]['slug']) # ---------------------- send log ---------------------------- log_message = LogMessage(session_id=cfg['session']['id'], # session_id=cfg['session']['id'], @@ -264,7 +265,7 @@ if __name__ == '__main__': parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True) parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True) parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, - argu=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) + choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) parser.add_argument('--user', dest='user', help='the RabbitMQ user login', type=str, required=True) parser.add_argument('--password', dest='password', help='the RabbitMQ user password', type=str, required=True) diff --git a/workers/doc-processor.py b/workers/doc-processor.py index b121f9e2141f1f449f50f8f28434a52d54e4c718..5ce324644cf96f55297e55649333dea8c3a583f6 100644 --- a/workers/doc-processor.py +++ b/workers/doc-processor.py @@ -112,8 +112,9 @@ def process_docs( channel, method, properties, body ): logging.info('[%6.2f%%] Processing %i docs for dataset %s...' % (progress_ratio*100, len(docs), docs[0]['slug'])) progress_rounded = round(progress_ratio * 100) - status_message = '[' + str(progress_ratio * 100) + '%] Sending ' + str( - len(docs)) + ' docs to RabbitMQ for dataset ' + str(docs[0]['slug']) + '...' + status_message = '[{:.2f} %] Sending {:d} docs to RabbitMQ for dataset {:s}...'.format(progress_ratio * 100, + len(docs), + docs[0]['slug']) # ---------------------- send log ---------------------------- log_message = LogMessage(session_id=cfg['session']['id'], # session_id=cfg['session']['id'], diff --git a/workers/reindexer.py b/workers/reindexer.py index f695990eb186278a127f2cb32a127a2ff9b3db00..3dbf746d15a52db580e0d5f0ee82fe1653428069 100644 --- a/workers/reindexer.py +++ b/workers/reindexer.py @@ -150,7 +150,8 @@ def on_msg_callback(channel, method, properties, body): logging.info("Removing dataset with uuid = %s from the destination index..." % uuid) # ---------------------- send log ---------------------------- - message = 'Removing dataset with uuid: ' + str(uuid) + ' from the destination index...' + # message = 'Removing dataset with uuid: ' + str(uuid) + ' from the destination index...' + message = 'Removing dataset with uuid: {:s} from the destination index...'.format(uuid) log_message = LogMessage(session_id=cfg['session']['id'], # session_id=cfg['session']['id'], diff --git a/workers/sample-generator.py b/workers/sample-generator.py index 9e5f2a8660ee556bee65a8cde0136465ac0ce11f..d1089e7bdc0109afd3f8cc2e61ae06af4f6fc4ca 100644 --- a/workers/sample-generator.py +++ b/workers/sample-generator.py @@ -182,7 +182,7 @@ def callback(channel, method, properties, body): logging.info("Deleting already existing samples for dataset with slug = %s" % docs_to_index[0]['slug']) # ---------------------- send log ---------------------------- - message = "Deleting already existing samples for dataset with slug = " + str(docs_to_index[0]['slug']) + message = "Deleting already existing samples for dataset with slug = {:s} ".format(docs_to_index[0]['slug']) log_message = LogMessage(session_id=cfg['session']['id'], uuid=cfg['session']['current_uuid'], step='sampler',