diff --git a/api.py b/api.py index 485f14e2bc6478289ac559785119791b7ea4f70f..0b86b01423400aaf5c5557ab73f239b1e6993975 100644 --- a/api.py +++ b/api.py @@ -135,8 +135,6 @@ def build_plot(session_id, step): img = io.BytesIO() x, y = MongoSession(cfg=cfg).get_array_from_step(this_session_id=session_id, step_name=step) - # y = [1,2,3,4,5] - # x = [0,2,1,3,4] plt.clf() plt.plot(x, y, label=step) plt.savefig(img, format='png') @@ -151,12 +149,17 @@ def build_plot(session_id, step): def build_all_plot(session_id): img = io.BytesIO() plt.clf() - for step in ['main', 'doc-enricher', 'doc-processor', 'doc-indexer']: - x, y = MongoSession(cfg=cfg).get_array_from_step(this_session_id=session_id, step_name=step) - # y = [1,2,3,4,5] - # x = [0,2,1,3,4] - plt.plot(x, y, label=step) + for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer', 'reindexer', 'sampler']: + try: + x, y = MongoSession(cfg=cfg).get_array_from_step(this_session_id=session_id, step_name=step) + plt.plot(x, y, label=step) + except Exception as _: + print('cannot get data from :' + step) plt.legend() + plt.xlabel('time') + plt.ylabel('progress ratio') + plt.title('indexer session scripts progress report.') + plt.grid(True) plt.savefig(img, format='png') img.seek(0) @@ -165,5 +168,21 @@ def build_all_plot(session_id): return '<img src="data:image/png;base64,{}">'.format(plot_url) +@api.route('/json/<session_id>') +def build_full_session_json(session_id): + data = dict() + + for step in ['main', 'metadata-processor', 'doc-enricher', 'doc-processor', 'doc-indexer', 'reindexer', 'sampler']: + try: + data[step] = dict() + data[step]['timestamp'], data[step]['progress_ratio'] =\ + MongoSession(cfg=cfg).get_array_from_step(this_session_id=session_id, step_name=step) + + except Exception as _: + print('cannot get data from :' + step) + + return Response(json.dumps(data, indent=4, sort_keys=True, default=str), mimetype='application/json') + + if __name__ == '__main__': api.run(host='0.0.0.0', port=8000, debug=True) diff --git a/lib/mongo_session.py b/lib/mongo_session.py index e1d9f82e2daec880e0671db9a3490e92cdba51a1..41715c5a4b3299eb27de058476d90114c194f91c 100644 --- a/lib/mongo_session.py +++ b/lib/mongo_session.py @@ -8,7 +8,6 @@ class MongoSession: def __init__(self, cfg): self.cfg = cfg - print('cfg passed to MongoSession') self.collection_name = cfg['mongo']['collection'] self.mongo_client = self.init_client_mongo() self.mongo_data_collection = self.set_mongo_collection(mongo_client=self.mongo_client, diff --git a/main.py b/main.py index 096edf5b355aaebf546aecd4369740df267efe9e..e0327a4d6c7186d5e29b38cd21087b81d4c87b2d 100644 --- a/main.py +++ b/main.py @@ -311,19 +311,6 @@ def main(cfg): rabbit.publish_log(log_message=log_message2.__dict__) # ------------------------------------------------------------ - # ---------------------- send log ---------------------------- - log_message2 = LogMessage(session_id=cfg['session']['id'], - uuid=uuid_to_get, - step='main', - status='this message is for testing only', - uuid_prefix='meta', - info='no info', - loglevel='ERROR', - progress_ratio=None - ) - cfg['session']['current_uuid'] = uuid_to_get - rabbit.publish_log(log_message=log_message2.__dict__) - # ------------------------------------------------------------ # <-- the rabbit connexion is automatically closed here # if 'all' not in uuids_to_get: diff --git a/workers/doc-enricher.py b/workers/doc-enricher.py index 2017586d1080e35d79a31ccb3f0d107aa9086b81..6e91d0d2441607ee01e54fe358f095450e13484c 100644 --- a/workers/doc-enricher.py +++ b/workers/doc-enricher.py @@ -17,8 +17,7 @@ from lib.serializers import encode_datetime from lib.log_message import LogMessage - -def get_entries_from_postgis( link, cfg, no_features_per_page=1000 ): +def get_entries_from_postgis(link, cfg, no_features_per_page=1000): dbname = link['url'].split('/')[-1] schema, table_name = link['name'].split('.') @@ -124,10 +123,6 @@ def get_wfs( link, credentials, offset=0, no_features_per_page=1000 ): #print() - - - - def old_enrich_docs( channel, method, properties, body, **kwargs ): decoded_body = msgpack.unpackb(body, raw=False) @@ -284,7 +279,7 @@ def enrich_docs( channel, method, properties, body ): ) #logging.info('...done!') - if progress_rounded == 1 or progress_rounded == 1.00: + if progress_rounded > 0.98 or progress_rounded == 1.00: # ---------------------- send log ---------------------------- log_message = LogMessage(session_id=cfg['session']['id'], diff --git a/workers/metadata-processor.py b/workers/metadata-processor.py index a83a40c9ec512e23aeb5bd31fd6b8e1e63bdd31e..97bf4a995389fe21e46e2869c2ae9397025d699a 100644 --- a/workers/metadata-processor.py +++ b/workers/metadata-processor.py @@ -324,7 +324,7 @@ def callback( channel, method, properties, body ): print(" [x] json body : ", json_body) - channel.basic_publish(exchange=exchange, + channel.basic_publish(exchange=exchange_logs_name, routing_key=routing_key_logs, body=json_body, properties=pika.BasicProperties(delivery_mode=2) @@ -391,14 +391,14 @@ def callback( channel, method, properties, body ): uuid_prefix='meta', info='no info', loglevel='INFO', - progress_ratio=0.5 + progress_ratio=1 ) json_body = json.dumps(log_message.__dict__) print(" [x] json body : ", json_body) - channel.basic_publish(exchange=exchange, + channel.basic_publish(exchange=exchange_logs_name, routing_key=routing_key_logs, body=json_body, properties=pika.BasicProperties(delivery_mode=2) @@ -448,7 +448,7 @@ def callback( channel, method, properties, body ): print(" [x] json body : ", json_body) - channel.basic_publish(exchange=exchange, + channel.basic_publish(exchange=exchange_logs_name, routing_key=routing_key_logs, body=json_body, properties=pika.BasicProperties(delivery_mode=2) @@ -471,7 +471,7 @@ def callback( channel, method, properties, body ): print(" [x] json body : ", json_body) - channel.basic_publish(exchange=exchange, + channel.basic_publish(exchange=exchange_logs_name, routing_key=routing_key_logs, body=json_body, properties=pika.BasicProperties(delivery_mode=2) diff --git a/workers/reindexer.py b/workers/reindexer.py index b60983292d00411a5f071d3230b9ca33d7c56647..f29ff16180bda1f00b9b49463d94a56d85879b72 100644 --- a/workers/reindexer.py +++ b/workers/reindexer.py @@ -42,7 +42,7 @@ def create_sampling_task(cfg, channel, uuid): uuid_prefix='full', info=uuid, loglevel='INFO', - progress_ratio=0.2 + progress_ratio=0.4 ) json_body = json.dumps(log_message.__dict__) @@ -92,6 +92,29 @@ def on_msg_callback(channel, method, properties, body): # template['settings']['number_of_shards'] = cfg['reindexer']['number_of_shards'] # template['settings']['number_of_replicas'] = cfg['reindexer']['number_of_replicas'] + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='reindexer', + status='starting', + uuid_prefix='full', + info=uuid, + loglevel='INFO', + progress_ratio=0 + ) + + json_body = json.dumps(log_message.__dict__) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=cfg['rabbitmq']['exchange_logs_name'], + routing_key=cfg['rabbitmq']['routing_key_logs'], + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ + if 'source_url' in cfg['reindexer'].keys(): es_source = Elasticsearch([cfg['reindexer']['source_url']], timeout=60) else: @@ -144,14 +167,36 @@ def on_msg_callback(channel, method, properties, body): logging.warning('Documents are still being pushed to the source index for dataset with uuid = %s' % uuid) logging.debug('count_es = %i; count_ref = %i' % (count_es, count_ref)) time.sleep(5) + ratio = count_es / float(count_ref) channel.basic_nack(delivery_tag=method.delivery_tag, requeue=1) + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='reindexer', + status='Documents are still being pushed to the source index', + uuid_prefix='full', + info=uuid, + loglevel='INFO', + progress_ratio=ratio + ) + + json_body = json.dumps(log_message.__dict__) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=cfg['rabbitmq']['exchange_logs_name'], + routing_key=cfg['rabbitmq']['routing_key_logs'], + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ return # 1. remove already existing docs from destination index logging.info("Removing dataset with uuid = %s from the destination index..." % uuid) # ---------------------- send log ---------------------------- - # message = 'Removing dataset with uuid: ' + str(uuid) + ' from the destination index...' message = 'Removing dataset with uuid: {:s} from the destination index...'.format(uuid) log_message = LogMessage(session_id=cfg['session']['id'], @@ -162,7 +207,7 @@ def on_msg_callback(channel, method, properties, body): uuid_prefix='full', info=uuid, loglevel='INFO', - progress_ratio=0.3 + progress_ratio=0.8 ) json_body = json.dumps(log_message.__dict__) @@ -218,6 +263,29 @@ def on_msg_callback(channel, method, properties, body): logging.debug(res) res = es.indices.refresh(index=index) logging.debug(res) + # ---------------------- send log ---------------------------- + + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='reindexer', + status='refreshed indices', + uuid_prefix='meta', + info=uuid, + loglevel='INFO', + progress_ratio=0.9 + ) + + json_body = json.dumps(log_message.__dict__) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=cfg['rabbitmq']['exchange_logs_name'], + routing_key=cfg['rabbitmq']['routing_key_logs'], + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ except NotFoundError: pass except Exception as exc: @@ -234,7 +302,7 @@ def on_msg_callback(channel, method, properties, body): uuid_prefix='full', info=uuid, loglevel='ERROR', - progress_ratio=1 + progress_ratio=0.5 ) json_body = json.dumps(log_message.__dict__) @@ -307,7 +375,7 @@ def on_msg_callback(channel, method, properties, body): uuid_prefix='full', info=uuid, loglevel='INFO', - progress_ratio=0.4 + progress_ratio=1 ) json_body = json.dumps(log_message.__dict__) diff --git a/workers/sample-generator.py b/workers/sample-generator.py index 07810a172f46d481d714cc74a43bfd1745827fa8..c530f193fe69674aaf011a6ed685cbd4ff065197 100644 --- a/workers/sample-generator.py +++ b/workers/sample-generator.py @@ -190,7 +190,7 @@ def callback(channel, method, properties, body): uuid_prefix='', info=uuid, loglevel='INFO', - progress_ratio=None + progress_ratio=0 ) json_body = json.dumps(log_message.__dict__) @@ -257,7 +257,7 @@ def callback(channel, method, properties, body): logging.info("Pushing {0} samples to Elasticsearch for dataset {1}...".format(len(docs_to_index), docs_to_index[0]['slug'])) # ---------------------- send log ---------------------------- - message = "Pushing " + str(len(docs_to_index)) + " samples to Elasticsearch for dataset" + str(docs_to_index[0]['slug']) + message = "Pushing {0} samples to Elasticsearch for dataset {1}...".format(len(docs_to_index), docs_to_index[0]['slug']) log_message = LogMessage(session_id=cfg['session']['id'], uuid=cfg['session']['current_uuid'], step='sampler', @@ -265,7 +265,7 @@ def callback(channel, method, properties, body): uuid_prefix='', info=uuid, loglevel='INFO', - progress_ratio=None + progress_ratio=1 ) json_body = json.dumps(log_message.__dict__) @@ -302,7 +302,7 @@ def callback(channel, method, properties, body): uuid_prefix='full', info=uuid, loglevel='ERROR', - progress_ratio=None + progress_ratio=0 ) json_body = json.dumps(log_message.__dict__) @@ -322,27 +322,27 @@ def callback(channel, method, properties, body): # time.sleep(5) # channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) # ---------------------- send log ---------------------------- - message = "done" - log_message = LogMessage(session_id=cfg['session']['id'], - uuid=cfg['session']['current_uuid'], - step='sampler', - status=message, - uuid_prefix='', - info=uuid, - loglevel='INFO', - progress_ratio=1 - ) - - json_body = json.dumps(log_message.__dict__) - - print(" [x] json body : ", json_body) - - channel.basic_publish(exchange=exchange_logs_name, - routing_key=routing_key_logs, - body=json_body, - properties=pika.BasicProperties(delivery_mode=2) - ) - # ------------------------------------------------------------ + # message = "done" + # log_message = LogMessage(session_id=cfg['session']['id'], + # uuid=cfg['session']['current_uuid'], + # step='sampler', + # status=message, + # uuid_prefix='meta', + # info=uuid, + # loglevel='INFO', + # progress_ratio=1 + # ) + # + # json_body = json.dumps(log_message.__dict__) + # + # print(" [x] json body : ", json_body) + # + # channel.basic_publish(exchange=exchange_logs_name, + # routing_key=routing_key_logs, + # body=json_body, + # properties=pika.BasicProperties(delivery_mode=2) + # ) + # # ------------------------------------------------------------ return