Skip to content
Snippets Groups Projects
Commit b7c7300c authored by ddamiron's avatar ddamiron
Browse files

hotfix check done condition for doc enricher

hotfix check done condition for doc indexer
hotfix check done condition for doc processor
parent 6da66c64
No related branches found
No related tags found
No related merge requests found
...@@ -240,29 +240,28 @@ def enrich_docs( channel, method, properties, body ): ...@@ -240,29 +240,28 @@ def enrich_docs( channel, method, properties, body ):
logging.info('[%6.2f%%] Sending %i docs to RabbitMQ for dataset %s...' % (progress_ratio*100, len(doc_page), doc_page[0]['slug'])) logging.info('[%6.2f%%] Sending %i docs to RabbitMQ for dataset %s...' % (progress_ratio*100, len(doc_page), doc_page[0]['slug']))
progress_rounded = round(progress_ratio * 100) progress_rounded = round(progress_ratio * 100)
if progress_rounded % 10: status_message = '[' + str(progress_ratio*100) + '%] Sending ' + str(
status_message = '[' + str(progress_rounded) + '%] Sending ' + str( len(doc_page)) + 'docs to RabbitMQ for dataset ' + str(doc_page[0]['slug']) + '...'
len(doc_page)) + 'docs to RabbitMQ for dataset ' + str(doc_page[0]['slug']) + '...' # ---------------------- send log ----------------------------
# ---------------------- send log ---------------------------- log_message = LogMessage(session_id=cfg['session']['id'],
log_message = LogMessage(session_id=cfg['session']['id'], # session_id=cfg['session']['id'],
# session_id=cfg['session']['id'], uuid=cfg['session']['current_uuid'],
uuid=cfg['session']['current_uuid'], step='doc-enricher',
step='doc-enricher', status=status_message,
status=status_message, uuid_prefix='meta',
uuid_prefix='meta', info='no info'
info='no info' )
)
json_body = json.dumps(log_message.__dict__) json_body = json.dumps(log_message.__dict__)
print(" [x] json body : ", json_body) print(" [x] json body : ", json_body)
channel.basic_publish(exchange=exchange_logs_name, channel.basic_publish(exchange=exchange_logs_name,
routing_key=routing_key_logs, routing_key=routing_key_logs,
body=json_body, body=json_body,
properties=pika.BasicProperties(delivery_mode=2) properties=pika.BasicProperties(delivery_mode=2)
) )
# ------------------------------------------------------------ # ------------------------------------------------------------
msg = dict() msg = dict()
msg['header'] = dict() msg['header'] = dict()
...@@ -280,26 +279,28 @@ def enrich_docs( channel, method, properties, body ): ...@@ -280,26 +279,28 @@ def enrich_docs( channel, method, properties, body ):
) )
#logging.info('...done!') #logging.info('...done!')
# ---------------------- send log ---------------------------- if progress_rounded == 100:
log_message = LogMessage(session_id=cfg['session']['id'],
# session_id=cfg['session']['id'],
uuid=cfg['session']['current_uuid'],
step='doc-enricher',
status='done',
uuid_prefix='meta',
info='no info'
)
json_body = json.dumps(log_message.__dict__) # ---------------------- send log ----------------------------
log_message = LogMessage(session_id=cfg['session']['id'],
# session_id=cfg['session']['id'],
uuid=cfg['session']['current_uuid'],
step='doc-enricher',
status='done',
uuid_prefix='meta',
info='no info'
)
print(" [x] json body : ", json_body) json_body = json.dumps(log_message.__dict__)
channel.basic_publish(exchange=exchange_logs_name, print(" [x] json body : ", json_body)
routing_key=routing_key_logs,
body=json_body, channel.basic_publish(exchange=exchange_logs_name,
properties=pika.BasicProperties(delivery_mode=2) routing_key=routing_key_logs,
) body=json_body,
# ------------------------------------------------------------ properties=pika.BasicProperties(delivery_mode=2)
)
# ------------------------------------------------------------
# except TypeError: # it means that getWFS returned None # except TypeError: # it means that getWFS returned None
# pass # pass
# # # #
......
...@@ -123,29 +123,28 @@ def index_docs(channel, method, properties, body): ...@@ -123,29 +123,28 @@ def index_docs(channel, method, properties, body):
logging.info("[%6.2f%%] Pushing %i documents to Elasticsearch for dataset %s..." % (progress_ratio*100, len(docs_to_index), docs_to_index[0]['slug'])) logging.info("[%6.2f%%] Pushing %i documents to Elasticsearch for dataset %s..." % (progress_ratio*100, len(docs_to_index), docs_to_index[0]['slug']))
progress_rounded = round(progress_ratio * 100) progress_rounded = round(progress_ratio * 100)
if progress_rounded % 10: status_message = '[' + str(progress_ratio*100) + '%] Sending ' + str(
status_message = '[' + str(progress_rounded) + '%] Sending ' + str( len(docs_to_index)) + ' docs to RabbitMQ for dataset ' + str(docs_to_index[0]['slug']) + '...'
len(docs_to_index)) + 'docs to RabbitMQ for dataset ' + str(docs_to_index[0]['slug']) + '...' # ---------------------- send log ----------------------------
# ---------------------- send log ---------------------------- log_message = LogMessage(session_id=cfg['session']['id'],
log_message = LogMessage(session_id=cfg['session']['id'], # session_id=cfg['session']['id'],
# session_id=cfg['session']['id'], uuid=cfg['session']['current_uuid'],
uuid=cfg['session']['current_uuid'], step='doc-indexer',
step='doc-indexer', status=status_message,
status=status_message, uuid_prefix='meta',
uuid_prefix='meta', info='no info'
info='no info' )
)
json_body = json.dumps(log_message.__dict__) json_body = json.dumps(log_message.__dict__)
print(" [x] json body : ", json_body) print(" [x] json body : ", json_body)
channel.basic_publish(exchange=exchange_logs_name, channel.basic_publish(exchange=exchange_logs_name,
routing_key=routing_key_logs, routing_key=routing_key_logs,
body=json_body, body=json_body,
properties=pika.BasicProperties(delivery_mode=2) properties=pika.BasicProperties(delivery_mode=2)
) )
# ------------------------------------------------------------ # ------------------------------------------------------------
es_body = '' es_body = ''
...@@ -198,26 +197,28 @@ def index_docs(channel, method, properties, body): ...@@ -198,26 +197,28 @@ def index_docs(channel, method, properties, body):
#time.sleep(5) #time.sleep(5)
# ---------------------- send log ---------------------------- if progress_rounded == 100:
log_message = LogMessage(session_id=cfg['session']['id'],
# session_id=cfg['session']['id'],
uuid=cfg['session']['current_uuid'],
step='doc-indexer',
status='done',
uuid_prefix='meta',
info='no info'
)
json_body = json.dumps(log_message.__dict__) # ---------------------- send log ----------------------------
log_message = LogMessage(session_id=cfg['session']['id'],
# session_id=cfg['session']['id'],
uuid=cfg['session']['current_uuid'],
step='doc-indexer',
status='done',
uuid_prefix='meta',
info='no info'
)
print(" [x] json body : ", json_body) json_body = json.dumps(log_message.__dict__)
channel.basic_publish(exchange=exchange_logs_name, print(" [x] json body : ", json_body)
routing_key=routing_key_logs,
body=json_body, channel.basic_publish(exchange=exchange_logs_name,
properties=pika.BasicProperties(delivery_mode=2) routing_key=routing_key_logs,
) body=json_body,
# ------------------------------------------------------------ properties=pika.BasicProperties(delivery_mode=2)
)
# ------------------------------------------------------------
return return
......
...@@ -112,29 +112,28 @@ def process_docs( channel, method, properties, body ): ...@@ -112,29 +112,28 @@ def process_docs( channel, method, properties, body ):
logging.info('[%6.2f%%] Processing %i docs for dataset %s...' % (progress_ratio*100, len(docs), docs[0]['slug'])) logging.info('[%6.2f%%] Processing %i docs for dataset %s...' % (progress_ratio*100, len(docs), docs[0]['slug']))
progress_rounded = round(progress_ratio * 100) progress_rounded = round(progress_ratio * 100)
if progress_rounded % 10: status_message = '[' + str(progress_ratio * 100) + '%] Sending ' + str(
status_message = '[' + str(progress_rounded) + '%] Sending ' + str( len(docs)) + ' docs to RabbitMQ for dataset ' + str(docs[0]['slug']) + '...'
len(docs)) + 'docs to RabbitMQ for dataset ' + str(docs[0]['slug']) + '...' # ---------------------- send log ----------------------------
# ---------------------- send log ---------------------------- log_message = LogMessage(session_id=cfg['session']['id'],
log_message = LogMessage(session_id=cfg['session']['id'], # session_id=cfg['session']['id'],
# session_id=cfg['session']['id'], uuid=cfg['session']['current_uuid'],
uuid=cfg['session']['current_uuid'], step='doc-processor',
step='doc-processor', status=status_message,
status=status_message, uuid_prefix='meta',
uuid_prefix='meta', info='no info'
info='no info' )
)
json_body = json.dumps(log_message.__dict__) json_body = json.dumps(log_message.__dict__)
print(" [x] json body : ", json_body) print(" [x] json body : ", json_body)
channel.basic_publish(exchange=exchange_logs_name, channel.basic_publish(exchange=exchange_logs_name,
routing_key=routing_key_logs, routing_key=routing_key_logs,
body=json_body, body=json_body,
properties=pika.BasicProperties(delivery_mode=2) properties=pika.BasicProperties(delivery_mode=2)
) )
# ------------------------------------------------------------ # ------------------------------------------------------------
docs_to_index = fix_field_types( docs, field_types ) docs_to_index = fix_field_types( docs, field_types )
...@@ -156,26 +155,27 @@ def process_docs( channel, method, properties, body ): ...@@ -156,26 +155,27 @@ def process_docs( channel, method, properties, body ):
) )
logging.info('...done!') logging.info('...done!')
# ---------------------- send log ---------------------------- if progress_rounded == 100:
log_message = LogMessage(session_id=cfg['session']['id'], # ---------------------- send log ----------------------------
# session_id=cfg['session']['id'], log_message = LogMessage(session_id=cfg['session']['id'],
uuid=cfg['session']['current_uuid'], # session_id=cfg['session']['id'],
step='doc-processor', uuid=cfg['session']['current_uuid'],
status='done', step='doc-processor',
uuid_prefix='meta', status='done',
info='no info' uuid_prefix='meta',
) info='no info'
)
json_body = json.dumps(log_message.__dict__) json_body = json.dumps(log_message.__dict__)
print(" [x] json body : ", json_body) print(" [x] json body : ", json_body)
channel.basic_publish(exchange=exchange_logs_name, channel.basic_publish(exchange=exchange_logs_name,
routing_key=routing_key_logs, routing_key=routing_key_logs,
body=json_body, body=json_body,
properties=pika.BasicProperties(delivery_mode=2) properties=pika.BasicProperties(delivery_mode=2)
) )
# ------------------------------------------------------------ # ------------------------------------------------------------
channel.basic_ack(delivery_tag = method.delivery_tag) channel.basic_ack(delivery_tag = method.delivery_tag)
return return
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment