Skip to content
Snippets Groups Projects
Commit 1f12dc5a authored by ddamiron's avatar ddamiron
Browse files

add get json for all progress_ratio and timestamp end-point

parent e7b78edd
Branches
No related tags found
No related merge requests found
...@@ -149,7 +149,8 @@ def build_plot(session_id, step): ...@@ -149,7 +149,8 @@ def build_plot(session_id, step):
def build_all_plot(session_id): def build_all_plot(session_id):
img = io.BytesIO() img = io.BytesIO()
plt.clf() plt.clf()
for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer', 'reindexer', 'sampler']: for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer',
'pre-reindexer-checker', 'reindexer', 'sampler']:
try: try:
x, y = MongoSession(cfg=cfg).get_array_from_step(this_session_id=session_id, step_name=step) x, y = MongoSession(cfg=cfg).get_array_from_step(this_session_id=session_id, step_name=step)
plt.plot(x, y, label=step) plt.plot(x, y, label=step)
...@@ -158,8 +159,8 @@ def build_all_plot(session_id): ...@@ -158,8 +159,8 @@ def build_all_plot(session_id):
plt.legend() plt.legend()
plt.xlabel('time') plt.xlabel('time')
plt.ylabel('progress ratio') plt.ylabel('progress ratio')
plt.title('indexer session scripts progress report.') plt.title('indexer-session-scripts: progress-report.')
plt.grid(True) plt.grid(False)
plt.savefig(img, format='png') plt.savefig(img, format='png')
img.seek(0) img.seek(0)
...@@ -172,7 +173,8 @@ def build_all_plot(session_id): ...@@ -172,7 +173,8 @@ def build_all_plot(session_id):
def build_full_session_json(session_id): def build_full_session_json(session_id):
data = dict() data = dict()
for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer', 'reindexer', 'sampler']: for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer',
'pre-reindexer-checker', 'reindexer', 'sampler']:
try: try:
data[step] = dict() data[step] = dict()
data[step]['timestamp'], data[step]['progress_ratio'] =\ data[step]['timestamp'], data[step]['progress_ratio'] =\
......
...@@ -100,7 +100,9 @@ class MongoSession: ...@@ -100,7 +100,9 @@ class MongoSession:
data_time = [] data_time = []
data_value = [] data_value = []
request_result = self.mongo_data_collection.find({"session_id": this_session_id, "step": step_name}, request_result = self.mongo_data_collection.find({"session_id": this_session_id,
"step": step_name,
'loglevel': 'INFO'},
{'timestamp': 1, 'progress_ratio': 1}) {'timestamp': 1, 'progress_ratio': 1})
for res in request_result: for res in request_result:
......
...@@ -173,7 +173,7 @@ def on_msg_callback(channel, method, properties, body): ...@@ -173,7 +173,7 @@ def on_msg_callback(channel, method, properties, body):
log_message = LogMessage(session_id=cfg['session']['id'], log_message = LogMessage(session_id=cfg['session']['id'],
# session_id=cfg['session']['id'], # session_id=cfg['session']['id'],
uuid=cfg['session']['current_uuid'], uuid=cfg['session']['current_uuid'],
step='reindexer', step='pre-reindexer-checker',
status='Documents are still being pushed to the source index', status='Documents are still being pushed to the source index',
uuid_prefix='full', uuid_prefix='full',
info=uuid, info=uuid,
...@@ -302,7 +302,7 @@ def on_msg_callback(channel, method, properties, body): ...@@ -302,7 +302,7 @@ def on_msg_callback(channel, method, properties, body):
uuid_prefix='full', uuid_prefix='full',
info=uuid, info=uuid,
loglevel='ERROR', loglevel='ERROR',
progress_ratio=0.5 progress_ratio=None
) )
json_body = json.dumps(log_message.__dict__) json_body = json.dumps(log_message.__dict__)
......
...@@ -182,12 +182,12 @@ def callback(channel, method, properties, body): ...@@ -182,12 +182,12 @@ def callback(channel, method, properties, body):
logging.info("Deleting already existing samples for dataset with slug = %s" % docs_to_index[0]['slug']) logging.info("Deleting already existing samples for dataset with slug = %s" % docs_to_index[0]['slug'])
# ---------------------- send log ---------------------------- # ---------------------- send log ----------------------------
message = "Deleting already existing samples for dataset with slug = {:s} ".format(docs_to_index[0]['slug']) message = "Deleting already existing samples for dataset "
log_message = LogMessage(session_id=cfg['session']['id'], log_message = LogMessage(session_id=cfg['session']['id'],
uuid=cfg['session']['current_uuid'], uuid=cfg['session']['current_uuid'],
step='sampler', step='sampler',
status=message, status=message,
uuid_prefix='', uuid_prefix='meta',
info=uuid, info=uuid,
loglevel='INFO', loglevel='INFO',
progress_ratio=0 progress_ratio=0
...@@ -224,7 +224,7 @@ def callback(channel, method, properties, body): ...@@ -224,7 +224,7 @@ def callback(channel, method, properties, body):
uuid_prefix='full', uuid_prefix='full',
info=uuid, info=uuid,
loglevel='ERROR', loglevel='ERROR',
progress_ratio=None progress_ratio=0
) )
json_body = json.dumps(log_message.__dict__) json_body = json.dumps(log_message.__dict__)
...@@ -265,7 +265,7 @@ def callback(channel, method, properties, body): ...@@ -265,7 +265,7 @@ def callback(channel, method, properties, body):
uuid_prefix='', uuid_prefix='',
info=uuid, info=uuid,
loglevel='INFO', loglevel='INFO',
progress_ratio=1 progress_ratio=0.9
) )
json_body = json.dumps(log_message.__dict__) json_body = json.dumps(log_message.__dict__)
...@@ -289,6 +289,29 @@ def callback(channel, method, properties, body): ...@@ -289,6 +289,29 @@ def callback(channel, method, properties, body):
destin_es.indices.refresh(index=cfg['reindexer']['destination_index']) destin_es.indices.refresh(index=cfg['reindexer']['destination_index'])
logging.info("Removing lock for dataset with uuid = %s." % uuid.replace('.full', '')) logging.info("Removing lock for dataset with uuid = %s." % uuid.replace('.full', ''))
unlock( cfg['session']['working_directory'], uuid.replace('.full', '') ) unlock( cfg['session']['working_directory'], uuid.replace('.full', '') )
# ---------------------- send log ----------------------------
log_message = LogMessage(session_id=cfg['session']['id'],
# session_id=cfg['session']['id'],
uuid=cfg['session']['current_uuid'],
step='sampler',
status='Removing lock for dataset',
uuid_prefix='full',
info=uuid,
loglevel='INFO',
progress_ratio=1
)
json_body = json.dumps(log_message.__dict__)
print(" [x] json body : ", json_body)
channel.basic_publish(exchange=cfg['rabbitmq']['exchange_logs_name'],
routing_key=cfg['rabbitmq']['routing_key_logs'],
body=json_body,
properties=pika.BasicProperties(delivery_mode=2)
)
# ------------------------------------------------------------
else: else:
channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
logging.error(json.dumps(rep, indent=4)) logging.error(json.dumps(rep, indent=4))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment