diff --git a/workers/doc-enricher.py b/workers/doc-enricher.py index 92533949d05e02a793c48a13f0303f52ee4549a5..f183ac6afd124f233f51f474dd06d7d116536177 100644 --- a/workers/doc-enricher.py +++ b/workers/doc-enricher.py @@ -240,29 +240,28 @@ def enrich_docs( channel, method, properties, body ): logging.info('[%6.2f%%] Sending %i docs to RabbitMQ for dataset %s...' % (progress_ratio*100, len(doc_page), doc_page[0]['slug'])) progress_rounded = round(progress_ratio * 100) - if progress_rounded % 10: - status_message = '[' + str(progress_rounded) + '%] Sending ' + str( - len(doc_page)) + 'docs to RabbitMQ for dataset ' + str(doc_page[0]['slug']) + '...' - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=cfg['session']['id'], - # session_id=cfg['session']['id'], - uuid=cfg['session']['current_uuid'], - step='doc-enricher', - status=status_message, - uuid_prefix='meta', - info='no info' - ) + status_message = '[' + str(progress_ratio*100) + '%] Sending ' + str( + len(doc_page)) + 'docs to RabbitMQ for dataset ' + str(doc_page[0]['slug']) + '...' + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='doc-enricher', + status=status_message, + uuid_prefix='meta', + info='no info' + ) - json_body = json.dumps(log_message.__dict__) + json_body = json.dumps(log_message.__dict__) - print(" [x] json body : ", json_body) + print(" [x] json body : ", json_body) - channel.basic_publish(exchange=exchange_logs_name, - routing_key=routing_key_logs, - body=json_body, - properties=pika.BasicProperties(delivery_mode=2) - ) - # ------------------------------------------------------------ + channel.basic_publish(exchange=exchange_logs_name, + routing_key=routing_key_logs, + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ msg = dict() msg['header'] = dict() @@ -280,26 +279,28 @@ def enrich_docs( channel, method, properties, body ): ) #logging.info('...done!') - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=cfg['session']['id'], - # session_id=cfg['session']['id'], - uuid=cfg['session']['current_uuid'], - step='doc-enricher', - status='done', - uuid_prefix='meta', - info='no info' - ) + if progress_rounded == 100: - json_body = json.dumps(log_message.__dict__) + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='doc-enricher', + status='done', + uuid_prefix='meta', + info='no info' + ) - print(" [x] json body : ", json_body) + json_body = json.dumps(log_message.__dict__) - channel.basic_publish(exchange=exchange_logs_name, - routing_key=routing_key_logs, - body=json_body, - properties=pika.BasicProperties(delivery_mode=2) - ) - # ------------------------------------------------------------ + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=exchange_logs_name, + routing_key=routing_key_logs, + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ # except TypeError: # it means that getWFS returned None # pass # # diff --git a/workers/doc-indexer.py b/workers/doc-indexer.py index 74e400383a3382379273bdbb2b2f824e63f26639..193aa74882ee0adaf09c40c15bec5e836b41d8bd 100644 --- a/workers/doc-indexer.py +++ b/workers/doc-indexer.py @@ -123,29 +123,28 @@ def index_docs(channel, method, properties, body): logging.info("[%6.2f%%] Pushing %i documents to Elasticsearch for dataset %s..." % (progress_ratio*100, len(docs_to_index), docs_to_index[0]['slug'])) progress_rounded = round(progress_ratio * 100) - if progress_rounded % 10: - status_message = '[' + str(progress_rounded) + '%] Sending ' + str( - len(docs_to_index)) + 'docs to RabbitMQ for dataset ' + str(docs_to_index[0]['slug']) + '...' - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=cfg['session']['id'], - # session_id=cfg['session']['id'], - uuid=cfg['session']['current_uuid'], - step='doc-indexer', - status=status_message, - uuid_prefix='meta', - info='no info' - ) + status_message = '[' + str(progress_ratio*100) + '%] Sending ' + str( + len(docs_to_index)) + ' docs to RabbitMQ for dataset ' + str(docs_to_index[0]['slug']) + '...' + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='doc-indexer', + status=status_message, + uuid_prefix='meta', + info='no info' + ) - json_body = json.dumps(log_message.__dict__) + json_body = json.dumps(log_message.__dict__) - print(" [x] json body : ", json_body) + print(" [x] json body : ", json_body) - channel.basic_publish(exchange=exchange_logs_name, - routing_key=routing_key_logs, - body=json_body, - properties=pika.BasicProperties(delivery_mode=2) - ) - # ------------------------------------------------------------ + channel.basic_publish(exchange=exchange_logs_name, + routing_key=routing_key_logs, + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ es_body = '' @@ -198,26 +197,28 @@ def index_docs(channel, method, properties, body): #time.sleep(5) - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=cfg['session']['id'], - # session_id=cfg['session']['id'], - uuid=cfg['session']['current_uuid'], - step='doc-indexer', - status='done', - uuid_prefix='meta', - info='no info' - ) + if progress_rounded == 100: - json_body = json.dumps(log_message.__dict__) + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='doc-indexer', + status='done', + uuid_prefix='meta', + info='no info' + ) - print(" [x] json body : ", json_body) + json_body = json.dumps(log_message.__dict__) - channel.basic_publish(exchange=exchange_logs_name, - routing_key=routing_key_logs, - body=json_body, - properties=pika.BasicProperties(delivery_mode=2) - ) - # ------------------------------------------------------------ + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=exchange_logs_name, + routing_key=routing_key_logs, + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ return diff --git a/workers/doc-processor.py b/workers/doc-processor.py index 8089db0baf280e5570481da795bffb29fe59e358..5be41ba9afd1aad058ef6286856113f53afb4f77 100644 --- a/workers/doc-processor.py +++ b/workers/doc-processor.py @@ -112,29 +112,28 @@ def process_docs( channel, method, properties, body ): logging.info('[%6.2f%%] Processing %i docs for dataset %s...' % (progress_ratio*100, len(docs), docs[0]['slug'])) progress_rounded = round(progress_ratio * 100) - if progress_rounded % 10: - status_message = '[' + str(progress_rounded) + '%] Sending ' + str( - len(docs)) + 'docs to RabbitMQ for dataset ' + str(docs[0]['slug']) + '...' - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=cfg['session']['id'], - # session_id=cfg['session']['id'], - uuid=cfg['session']['current_uuid'], - step='doc-processor', - status=status_message, - uuid_prefix='meta', - info='no info' - ) + status_message = '[' + str(progress_ratio * 100) + '%] Sending ' + str( + len(docs)) + ' docs to RabbitMQ for dataset ' + str(docs[0]['slug']) + '...' + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='doc-processor', + status=status_message, + uuid_prefix='meta', + info='no info' + ) - json_body = json.dumps(log_message.__dict__) + json_body = json.dumps(log_message.__dict__) - print(" [x] json body : ", json_body) + print(" [x] json body : ", json_body) - channel.basic_publish(exchange=exchange_logs_name, - routing_key=routing_key_logs, - body=json_body, - properties=pika.BasicProperties(delivery_mode=2) - ) - # ------------------------------------------------------------ + channel.basic_publish(exchange=exchange_logs_name, + routing_key=routing_key_logs, + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ docs_to_index = fix_field_types( docs, field_types ) @@ -156,26 +155,27 @@ def process_docs( channel, method, properties, body ): ) logging.info('...done!') - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=cfg['session']['id'], - # session_id=cfg['session']['id'], - uuid=cfg['session']['current_uuid'], - step='doc-processor', - status='done', - uuid_prefix='meta', - info='no info' - ) + if progress_rounded == 100: + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='doc-processor', + status='done', + uuid_prefix='meta', + info='no info' + ) - json_body = json.dumps(log_message.__dict__) + json_body = json.dumps(log_message.__dict__) - print(" [x] json body : ", json_body) + print(" [x] json body : ", json_body) - channel.basic_publish(exchange=exchange_logs_name, - routing_key=routing_key_logs, - body=json_body, - properties=pika.BasicProperties(delivery_mode=2) - ) - # ------------------------------------------------------------ + channel.basic_publish(exchange=exchange_logs_name, + routing_key=routing_key_logs, + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ channel.basic_ack(delivery_tag = method.delivery_tag) return