From e7b78eddd968f486127932a389d66fe05a758f97 Mon Sep 17 00:00:00 2001
From: ddamiron <ddamiron@sii.fr>
Date: Wed, 3 Jul 2019 10:59:15 +0200
Subject: [PATCH] add get json  for all progress_ratio and timestamp end-point

---
 api.py                        | 33 +++++++++++----
 lib/mongo_session.py          |  1 -
 main.py                       | 13 ------
 workers/doc-enricher.py       |  9 +---
 workers/metadata-processor.py | 10 ++---
 workers/reindexer.py          | 78 ++++++++++++++++++++++++++++++++---
 workers/sample-generator.py   | 50 +++++++++++-----------
 7 files changed, 131 insertions(+), 63 deletions(-)

diff --git a/api.py b/api.py
index 485f14e..0b86b01 100644
--- a/api.py
+++ b/api.py
@@ -135,8 +135,6 @@ def build_plot(session_id, step):
 
     img = io.BytesIO()
     x, y = MongoSession(cfg=cfg).get_array_from_step(this_session_id=session_id, step_name=step)
-    # y = [1,2,3,4,5]
-    # x = [0,2,1,3,4]
     plt.clf()
     plt.plot(x, y, label=step)
     plt.savefig(img, format='png')
@@ -151,12 +149,17 @@ def build_plot(session_id, step):
 def build_all_plot(session_id):
     img = io.BytesIO()
     plt.clf()
-    for step in ['main', 'doc-enricher', 'doc-processor', 'doc-indexer']:
-        x, y = MongoSession(cfg=cfg).get_array_from_step(this_session_id=session_id, step_name=step)
-        # y = [1,2,3,4,5]
-        # x = [0,2,1,3,4]
-        plt.plot(x, y, label=step)
+    for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer', 'reindexer', 'sampler']:
+        try:
+            x, y = MongoSession(cfg=cfg).get_array_from_step(this_session_id=session_id, step_name=step)
+            plt.plot(x, y, label=step)
+        except Exception as _:
+            print('cannot get data from :' + step)
     plt.legend()
+    plt.xlabel('time')
+    plt.ylabel('progress ratio')
+    plt.title('indexer session scripts progress report.')
+    plt.grid(True)
     plt.savefig(img, format='png')
     img.seek(0)
 
@@ -165,5 +168,21 @@ def build_all_plot(session_id):
     return '<img src="data:image/png;base64,{}">'.format(plot_url)
 
 
+@api.route('/json/<session_id>')
+def build_full_session_json(session_id):
+    data = dict()
+
+    for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer', 'reindexer', 'sampler']:
+        try:
+            data[step] = dict()
+            data[step]['timestamp'], data[step]['progress_ratio'] =\
+                MongoSession(cfg=cfg).get_array_from_step(this_session_id=session_id, step_name=step)
+
+        except Exception as _:
+            print('cannot get data from :' + step)
+
+    return Response(json.dumps(data, indent=4, sort_keys=True, default=str), mimetype='application/json')
+
+
 if __name__ == '__main__':
     api.run(host='0.0.0.0', port=8000, debug=True)
diff --git a/lib/mongo_session.py b/lib/mongo_session.py
index e1d9f82..41715c5 100644
--- a/lib/mongo_session.py
+++ b/lib/mongo_session.py
@@ -8,7 +8,6 @@ class MongoSession:
 
     def __init__(self, cfg):
         self.cfg = cfg
-        print('cfg passed to MongoSession')
         self.collection_name = cfg['mongo']['collection']
         self.mongo_client = self.init_client_mongo()
         self.mongo_data_collection = self.set_mongo_collection(mongo_client=self.mongo_client,
diff --git a/main.py b/main.py
index 096edf5..e0327a4 100644
--- a/main.py
+++ b/main.py
@@ -311,19 +311,6 @@ def main(cfg):
             rabbit.publish_log(log_message=log_message2.__dict__)
             # ------------------------------------------------------------
 
-            # ---------------------- send log ----------------------------
-            log_message2 = LogMessage(session_id=cfg['session']['id'],
-                                      uuid=uuid_to_get,
-                                      step='main',
-                                      status='this message is for testing only',
-                                      uuid_prefix='meta',
-                                      info='no info',
-                                      loglevel='ERROR',
-                                      progress_ratio=None
-                                      )
-            cfg['session']['current_uuid'] = uuid_to_get
-            rabbit.publish_log(log_message=log_message2.__dict__)
-            # ------------------------------------------------------------
         # <-- the rabbit connexion is automatically closed here
 
     # if 'all' not in uuids_to_get:
diff --git a/workers/doc-enricher.py b/workers/doc-enricher.py
index 2017586..6e91d0d 100644
--- a/workers/doc-enricher.py
+++ b/workers/doc-enricher.py
@@ -17,8 +17,7 @@ from lib.serializers import encode_datetime
 from lib.log_message import LogMessage
 
 
-
-def get_entries_from_postgis( link, cfg, no_features_per_page=1000 ):
+def get_entries_from_postgis(link, cfg, no_features_per_page=1000):
 
     dbname = link['url'].split('/')[-1]
     schema, table_name = link['name'].split('.')
@@ -124,10 +123,6 @@ def get_wfs( link, credentials, offset=0, no_features_per_page=1000 ):
     #print()
 
 
-
-
-
-
 def old_enrich_docs( channel, method, properties, body, **kwargs ):
 
     decoded_body = msgpack.unpackb(body, raw=False)
@@ -284,7 +279,7 @@ def enrich_docs( channel, method, properties, body ):
                                )
 
         #logging.info('...done!')
-        if progress_rounded == 1 or progress_rounded == 1.00:
+        if progress_rounded > 0.98 or progress_rounded == 1.00:
 
             # ---------------------- send log ----------------------------
             log_message = LogMessage(session_id=cfg['session']['id'],
diff --git a/workers/metadata-processor.py b/workers/metadata-processor.py
index a83a40c..97bf4a9 100644
--- a/workers/metadata-processor.py
+++ b/workers/metadata-processor.py
@@ -324,7 +324,7 @@ def callback( channel, method, properties, body ):
 
     print(" [x] json body : ", json_body)
 
-    channel.basic_publish(exchange=exchange,
+    channel.basic_publish(exchange=exchange_logs_name,
                           routing_key=routing_key_logs,
                           body=json_body,
                           properties=pika.BasicProperties(delivery_mode=2)
@@ -391,14 +391,14 @@ def callback( channel, method, properties, body ):
                                          uuid_prefix='meta',
                                          info='no info',
                                          loglevel='INFO',
-                                         progress_ratio=0.5
+                                         progress_ratio=1
                                          )
 
                 json_body = json.dumps(log_message.__dict__)
 
                 print(" [x] json body : ", json_body)
 
-                channel.basic_publish(exchange=exchange,
+                channel.basic_publish(exchange=exchange_logs_name,
                                       routing_key=routing_key_logs,
                                       body=json_body,
                                       properties=pika.BasicProperties(delivery_mode=2)
@@ -448,7 +448,7 @@ def callback( channel, method, properties, body ):
 
         print(" [x] json body : ", json_body)
 
-        channel.basic_publish(exchange=exchange,
+        channel.basic_publish(exchange=exchange_logs_name,
                               routing_key=routing_key_logs,
                               body=json_body,
                               properties=pika.BasicProperties(delivery_mode=2)
@@ -471,7 +471,7 @@ def callback( channel, method, properties, body ):
 
     print(" [x] json body : ", json_body)
 
-    channel.basic_publish(exchange=exchange,
+    channel.basic_publish(exchange=exchange_logs_name,
                           routing_key=routing_key_logs,
                           body=json_body,
                           properties=pika.BasicProperties(delivery_mode=2)
diff --git a/workers/reindexer.py b/workers/reindexer.py
index b609832..f29ff16 100644
--- a/workers/reindexer.py
+++ b/workers/reindexer.py
@@ -42,7 +42,7 @@ def create_sampling_task(cfg, channel, uuid):
                              uuid_prefix='full',
                              info=uuid,
                              loglevel='INFO',
-                             progress_ratio=0.2
+                             progress_ratio=0.4
                              )
 
     json_body = json.dumps(log_message.__dict__)
@@ -92,6 +92,29 @@ def on_msg_callback(channel, method, properties, body):
     # template['settings']['number_of_shards'] = cfg['reindexer']['number_of_shards']
     # template['settings']['number_of_replicas'] = cfg['reindexer']['number_of_replicas']
 
+    # ---------------------- send log ----------------------------
+    log_message = LogMessage(session_id=cfg['session']['id'],
+                             # session_id=cfg['session']['id'],
+                             uuid=cfg['session']['current_uuid'],
+                             step='reindexer',
+                             status='starting',
+                             uuid_prefix='full',
+                             info=uuid,
+                             loglevel='INFO',
+                             progress_ratio=0
+                             )
+
+    json_body = json.dumps(log_message.__dict__)
+
+    print(" [x] json body : ", json_body)
+
+    channel.basic_publish(exchange=cfg['rabbitmq']['exchange_logs_name'],
+                          routing_key=cfg['rabbitmq']['routing_key_logs'],
+                          body=json_body,
+                          properties=pika.BasicProperties(delivery_mode=2)
+                          )
+    # ------------------------------------------------------------
+
     if 'source_url' in cfg['reindexer'].keys():
         es_source = Elasticsearch([cfg['reindexer']['source_url']], timeout=60)
     else:
@@ -144,14 +167,36 @@ def on_msg_callback(channel, method, properties, body):
         logging.warning('Documents are still being pushed to the source index for dataset with uuid = %s' % uuid)
         logging.debug('count_es = %i; count_ref = %i' % (count_es, count_ref))
         time.sleep(5)
+        ratio = count_es / float(count_ref)
         channel.basic_nack(delivery_tag=method.delivery_tag, requeue=1)
+        # ---------------------- send log ----------------------------
+        log_message = LogMessage(session_id=cfg['session']['id'],
+                                 # session_id=cfg['session']['id'],
+                                 uuid=cfg['session']['current_uuid'],
+                                 step='reindexer',
+                                 status='Documents are still being pushed to the source index',
+                                 uuid_prefix='full',
+                                 info=uuid,
+                                 loglevel='INFO',
+                                 progress_ratio=ratio
+                                 )
+
+        json_body = json.dumps(log_message.__dict__)
+
+        print(" [x] json body : ", json_body)
+
+        channel.basic_publish(exchange=cfg['rabbitmq']['exchange_logs_name'],
+                              routing_key=cfg['rabbitmq']['routing_key_logs'],
+                              body=json_body,
+                              properties=pika.BasicProperties(delivery_mode=2)
+                              )
+        # ------------------------------------------------------------
         return
 
     # 1. remove already existing docs from destination index
     logging.info("Removing dataset with uuid = %s from the destination index..." % uuid)
 
     # ---------------------- send log ----------------------------
-    # message = 'Removing dataset with uuid: ' + str(uuid) + ' from the destination index...'
     message = 'Removing dataset with uuid: {:s} from the destination index...'.format(uuid)
 
     log_message = LogMessage(session_id=cfg['session']['id'],
@@ -162,7 +207,7 @@ def on_msg_callback(channel, method, properties, body):
                              uuid_prefix='full',
                              info=uuid,
                              loglevel='INFO',
-                             progress_ratio=0.3
+                             progress_ratio=0.8
                              )
 
     json_body = json.dumps(log_message.__dict__)
@@ -218,6 +263,29 @@ def on_msg_callback(channel, method, properties, body):
         logging.debug(res)
         res = es.indices.refresh(index=index)
         logging.debug(res)
+        # ---------------------- send log ----------------------------
+
+        log_message = LogMessage(session_id=cfg['session']['id'],
+                                 # session_id=cfg['session']['id'],
+                                 uuid=cfg['session']['current_uuid'],
+                                 step='reindexer',
+                                 status='refreshed indices',
+                                 uuid_prefix='meta',
+                                 info=uuid,
+                                 loglevel='INFO',
+                                 progress_ratio=0.9
+                                 )
+
+        json_body = json.dumps(log_message.__dict__)
+
+        print(" [x] json body : ", json_body)
+
+        channel.basic_publish(exchange=cfg['rabbitmq']['exchange_logs_name'],
+                              routing_key=cfg['rabbitmq']['routing_key_logs'],
+                              body=json_body,
+                              properties=pika.BasicProperties(delivery_mode=2)
+                              )
+        # ------------------------------------------------------------
     except NotFoundError:
         pass
     except Exception as exc:
@@ -234,7 +302,7 @@ def on_msg_callback(channel, method, properties, body):
                                  uuid_prefix='full',
                                  info=uuid,
                                  loglevel='ERROR',
-                                 progress_ratio=1
+                                 progress_ratio=0.5
                                  )
 
         json_body = json.dumps(log_message.__dict__)
@@ -307,7 +375,7 @@ def on_msg_callback(channel, method, properties, body):
                                      uuid_prefix='full',
                                      info=uuid,
                                      loglevel='INFO',
-                                     progress_ratio=0.4
+                                     progress_ratio=1
                                      )
 
             json_body = json.dumps(log_message.__dict__)
diff --git a/workers/sample-generator.py b/workers/sample-generator.py
index 07810a1..c530f19 100644
--- a/workers/sample-generator.py
+++ b/workers/sample-generator.py
@@ -190,7 +190,7 @@ def callback(channel, method, properties, body):
                              uuid_prefix='',
                              info=uuid,
                              loglevel='INFO',
-                             progress_ratio=None
+                             progress_ratio=0
                              )
 
     json_body = json.dumps(log_message.__dict__)
@@ -257,7 +257,7 @@ def callback(channel, method, properties, body):
     logging.info("Pushing {0} samples to Elasticsearch for dataset {1}...".format(len(docs_to_index), docs_to_index[0]['slug']))
 
     # ---------------------- send log ----------------------------
-    message = "Pushing " + str(len(docs_to_index)) + " samples to Elasticsearch for dataset" + str(docs_to_index[0]['slug'])
+    message = "Pushing {0} samples to Elasticsearch for dataset {1}...".format(len(docs_to_index), docs_to_index[0]['slug'])
     log_message = LogMessage(session_id=cfg['session']['id'],
                              uuid=cfg['session']['current_uuid'],
                              step='sampler',
@@ -265,7 +265,7 @@ def callback(channel, method, properties, body):
                              uuid_prefix='',
                              info=uuid,
                              loglevel='INFO',
-                             progress_ratio=None
+                             progress_ratio=1
                              )
 
     json_body = json.dumps(log_message.__dict__)
@@ -302,7 +302,7 @@ def callback(channel, method, properties, body):
                                  uuid_prefix='full',
                                  info=uuid,
                                  loglevel='ERROR',
-                                 progress_ratio=None
+                                 progress_ratio=0
                                  )
 
         json_body = json.dumps(log_message.__dict__)
@@ -322,27 +322,27 @@ def callback(channel, method, properties, body):
     #     time.sleep(5)
     #     channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
     # ---------------------- send log ----------------------------
-    message = "done"
-    log_message = LogMessage(session_id=cfg['session']['id'],
-                             uuid=cfg['session']['current_uuid'],
-                             step='sampler',
-                             status=message,
-                             uuid_prefix='',
-                             info=uuid,
-                             loglevel='INFO',
-                             progress_ratio=1
-                             )
-
-    json_body = json.dumps(log_message.__dict__)
-
-    print(" [x] json body : ", json_body)
-
-    channel.basic_publish(exchange=exchange_logs_name,
-                          routing_key=routing_key_logs,
-                          body=json_body,
-                          properties=pika.BasicProperties(delivery_mode=2)
-                          )
-    # ------------------------------------------------------------
+    # message = "done"
+    # log_message = LogMessage(session_id=cfg['session']['id'],
+    #                          uuid=cfg['session']['current_uuid'],
+    #                          step='sampler',
+    #                          status=message,
+    #                          uuid_prefix='meta',
+    #                          info=uuid,
+    #                          loglevel='INFO',
+    #                          progress_ratio=1
+    #                          )
+    #
+    # json_body = json.dumps(log_message.__dict__)
+    #
+    # print(" [x] json body : ", json_body)
+    #
+    # channel.basic_publish(exchange=exchange_logs_name,
+    #                       routing_key=routing_key_logs,
+    #                       body=json_body,
+    #                       properties=pika.BasicProperties(delivery_mode=2)
+    #                       )
+    # # ------------------------------------------------------------
 
     return
 
-- 
GitLab