diff --git a/README.md b/README.md index 3b028aaf38eacce81558a4af4a736f59e8f11ade..ce4638fa799c339f8df62d06ba879c1ae74b5dcc 100644 --- a/README.md +++ b/README.md @@ -20,6 +20,19 @@ A simplified overview of the entire workflow is provided by the attached [draw.i 7. Wait ! 8. Run `docker-compose -f docker-compose-tools.yml up metadata-getter` + +TO connect to metabase: +go to <your-url>:3001 + +use the interface to create an account +connect to the mongo database setting the parameters: +1. database typde: MongoDB +2. Name: mongo-indexer +3. Host: mongo +4. Database name: indexerdb +5. Port: <mongo_port> +6. Database username: <mongo_user_login> +7. Database password: <mongo_user_password> # TODO * adding an HTTP API to trigger indexation diff --git a/docker-compose-workers.yml b/docker-compose-workers.yml index e54331fdb450cd6e0e199c2b3f348e2ad31bb3d0..8bac4a1878c284c14c990502b1f483a480f92e3b 100644 --- a/docker-compose-workers.yml +++ b/docker-compose-workers.yml @@ -88,8 +88,8 @@ services: image: mongo #restart: unless-stopped environment: - MONGO_INITDB_ROOT_USERNAME: root - MONGO_INITDB_ROOT_PASSWORD: example + MONGO_INITDB_ROOT_USERNAME: ${MONGO_USER} + MONGO_INITDB_ROOT_PASSWORD: ${MONGO_PASSWORD} ports: - 27017:27017 volumes: @@ -104,7 +104,28 @@ services: # volumes: # - ./config.yaml:/app/config.yaml:ro + metabase: + image: metabase/metabase +# restart: unless-stopped + ports: + - 3001:3000 + volumes: + # declare your mount volume /host/dir:/container/dir + - /home/app/metabase-data:/metabase-data +# environment: +# MB_DB_TYPE: mongo +# MB_DB_DBNAME: indexerdb +# MB_DB_PORT: 27017 +# MB_DB_USER: ${MONGO_USER} +# MB_DB_PASS: ${MONGO_PASSWORD} +# MB_DB_HOST: mongo + depends_on: + - mongo + links: + - mongo + volumes: rabbitmq: working-directory: mongo: + metabase: diff --git a/lib/create_mongo_user.py b/lib/create_mongo_user.py index d680ed9182a6c49d1107fdec99ae52876b01abb3..773f0a54244784aa1ca43ee36bca4c7c77aec84b 100644 --- a/lib/create_mongo_user.py +++ b/lib/create_mongo_user.py @@ -8,6 +8,8 @@ def create_mongo_user(): client.admin.authenticate('root', 'example') client.indexerdb.add_user('root', 'example', roles=[{'role': 'readWrite', 'db': 'indexerdb'}]) + client.metabase.add_user('metabase', 'metabase', roles=[{'role': 'readWrite', + 'db': 'metabase'}]) if __name__ == '__main__': diff --git a/tools/field-type-detector.py b/tools/field-type-detector.py index 6ba1692c3c8f59370c1f807b42550d5adf37e1c8..5db3831be9aa6fce5574ab331f73b786a3599cce 100644 --- a/tools/field-type-detector.py +++ b/tools/field-type-detector.py @@ -14,8 +14,6 @@ from lib.exit_gracefully import exit_gracefully from lib.my_logging import logging from lib.postgis_helper import Remote - - class NotEmptyQueueException(Exception): pass diff --git a/workers/reindexer.py b/workers/reindexer.py index d69faef6c3911de6ae0caf103386b295342c0441..f7f35efc6f7a86b39a5f95053ae8b5a0bdfa1847 100644 --- a/workers/reindexer.py +++ b/workers/reindexer.py @@ -32,19 +32,7 @@ def create_sampling_task(cfg, channel, uuid): # connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'])) # channel = connection.channel() - exchange = cfg['rabbitmq']['exchange'] - queue_name = cfg['rabbitmq']['queue_name_6'] - routing_key = cfg['rabbitmq']['routing_key_6'] - queue_logs_name = cfg['rabbitmq']['queue_logs_name'] - routing_key_logs = cfg['rabbitmq']['routing_key_logs'] - exchange_logs_name = cfg['rabbitmq']['exchange_logs_name'] - - channel.exchange_declare(exchange=cfg['rabbitmq']['exchange'], exchange_type='direct') - channel.queue_declare(queue=queue_name, durable=True) - channel.queue_bind(exchange=cfg['rabbitmq']['exchange'], queue=queue_name, routing_key=routing_key) - channel.queue_declare(queue=queue_logs_name, durable=True) - channel.queue_bind(exchange=exchange_logs_name, queue=queue_logs_name, routing_key=routing_key_logs) # ---------------------- send log ---------------------------- log_message = LogMessage(session_id=cfg['session']['id'], # session_id=cfg['session']['id'], @@ -59,15 +47,15 @@ def create_sampling_task(cfg, channel, uuid): print(" [x] json body : ", json_body) - channel.basic_publish(exchange=exchange_logs_name, - routing_key=routing_key_logs, + channel.basic_publish(exchange=cfg['rabbitmq']['exchange_logs_name'], + routing_key=cfg['rabbitmq']['routing_key_logs'], body=json_body, properties=pika.BasicProperties(delivery_mode=2) ) # ------------------------------------------------------------ - channel.basic_publish( exchange=exchange, - routing_key=routing_key, + channel.basic_publish( exchange=cfg['rabbitmq']['exchange'], + routing_key=cfg['rabbitmq']['routing_key_6'], body=the_body, properties=pika.BasicProperties(delivery_mode = 2) ) @@ -77,14 +65,26 @@ def create_sampling_task(cfg, channel, uuid): return -def on_msg_callback(channel, method, properties, body): +def on_msg_callback(channel, method, properties, body): decoded_body = msgpack.unpackb(body, raw=False) cfg = decoded_body['header']['cfg'] uuid = decoded_body['body'] count_ref = decoded_body['header']['count'] + exchange = cfg['rabbitmq']['exchange'] + + queue_name = cfg['rabbitmq']['queue_name_6'] + routing_key = cfg['rabbitmq']['routing_key_6'] + queue_logs_name = cfg['rabbitmq']['queue_logs_name'] + routing_key_logs = cfg['rabbitmq']['routing_key_logs'] + exchange_logs_name = cfg['rabbitmq']['exchange_logs_name'] + channel.exchange_declare(exchange=cfg['rabbitmq']['exchange'], exchange_type='direct') + channel.queue_declare(queue=queue_name, durable=True) + channel.queue_bind(exchange=cfg['rabbitmq']['exchange'], queue=queue_name, routing_key=routing_key) + channel.queue_declare(queue=queue_logs_name, durable=True) + channel.queue_bind(exchange=exchange_logs_name, queue=queue_logs_name, routing_key=routing_key_logs) # from lib.elasticsearch_template import template # template['index_patterns'] = [ cfg['reindexer']['destination_index'] ] # template['settings']['number_of_shards'] = cfg['reindexer']['number_of_shards'] @@ -142,13 +142,35 @@ def on_msg_callback(channel, method, properties, body): logging.warning('Documents are still being pushed to the source index for dataset with uuid = %s' % uuid) logging.debug('count_es = %i; count_ref = %i' % (count_es, count_ref)) time.sleep(5) - channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + channel.basic_nack(delivery_tag=method.delivery_tag, requeue=1) return - # 1. remove already existing docs from destination index logging.info("Removing dataset with uuid = %s from the destination index..." % uuid) + # ---------------------- send log ---------------------------- + message = 'Removing dataset with uuid: ' + str(uuid) + ' from the destination index...' + + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='reindexer', + status=message, + uuid_prefix='full', + info=uuid + ) + + json_body = json.dumps(log_message.__dict__) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=cfg['rabbitmq']['exchange_logs_name'], + routing_key=cfg['rabbitmq']['routing_key_logs'], + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ + es = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60) index = cfg['reindexer']['destination_index'] @@ -220,10 +242,55 @@ def on_msg_callback(channel, method, properties, body): if '.full' in uuid: create_sampling_task(cfg, channel, uuid)#, reindex_task_url) logging.info("Created sampling task.") + + # ---------------------- send log ---------------------------- + message = "Created sampling task and found full in uuid." + + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='reindexer', + status=message, + uuid_prefix='full', + info=uuid + ) + + json_body = json.dumps(log_message.__dict__) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=cfg['rabbitmq']['exchange_logs_name'], + routing_key=cfg['rabbitmq']['routing_key_logs'], + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ # otherwise, remove the lock else: logging.info("Removing lock for dataset with uuid = %s." % uuid.replace('.meta', '')) - unlock( cfg['session']['working_directory'], uuid.replace('.meta', '')) + unlock(cfg['session']['working_directory'], uuid.replace('.meta', '')) + # ---------------------- send log ---------------------------- + message = "Removing lock for dataset with uuid : " + str(uuid.replace('.meta', '')) + + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='reindexer', + status=message, + uuid_prefix='meta', + info=uuid + ) + + json_body = json.dumps(log_message.__dict__) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=cfg['rabbitmq']['exchange_logs_name'], + routing_key=cfg['rabbitmq']['routing_key_logs'], + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ @@ -232,7 +299,28 @@ def on_msg_callback(channel, method, properties, body): #print("") logging.error(json.dumps(rep, indent=4)) logging.error("Failed") + # ---------------------- send log ---------------------------- + message = "done" + + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='reindexer', + status=message, + uuid_prefix='', + info=uuid + ) + + json_body = json.dumps(log_message.__dict__) + + print(" [x] json body : ", json_body) + channel.basic_publish(exchange=cfg['rabbitmq']['exchange_logs_name'], + routing_key=cfg['rabbitmq']['routing_key_logs'], + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ return diff --git a/workers/sample-generator.py b/workers/sample-generator.py index 70d1c038d882044a3cb51869ccfa5e98f54dd9a5..1ff6a03022ead69e78fd9960642bc626f4b042c1 100644 --- a/workers/sample-generator.py +++ b/workers/sample-generator.py @@ -17,120 +17,120 @@ from lib.locker import unlock from lib.log_message import LogMessage -def old_callback(channel, method, properties, body): - - decoded_body = msgpack.unpackb(body, raw=False) - print(decoded_body) - - reindex_task_url = decoded_body['reindex_task_url'] - - res = requests.get(reindex_task_url) - data = res.json() - - print( json.dumps(data, indent=4) ) - - if data['completed'] == True: - print(True) - - # delete all the already existing samples - - destin_es = Elasticsearch([decoded_body['dest_url']], timeout=60) - destin_es.indices.refresh(index=decoded_body['dest_index']) - - the_query = dict() - the_query['query'] = dict() - the_query['query']['term'] = {'editorial-metadata.isSample': True} - - res = destin_es.delete_by_query(decoded_body['dest_index'], doc_type='_doc', body=the_query) - - print( json.dumps(res) ) - - #exit(1) - - # get sample records from the ingest index - source_es = Elasticsearch([decoded_body['source_url']], timeout=60) - - the_query = dict() - the_query['size'] = 0 - # the_query['query'] = dict() - # the_query['query']['exists'] = dict() - # the_query['query']['exists']['field'] = "data-fr.properties.address.addressCountry" - the_query['aggs'] = dict() - the_query['aggs']['my-aggregation'] = dict() - the_query['aggs']['my-aggregation']['terms'] = dict() - the_query['aggs']['my-aggregation']['terms']['field'] = "uuid" - the_query['aggs']['my-aggregation']['terms']['size'] = 2000 - - - res = source_es.search(decoded_body['source_index'], '_doc', the_query) - - # print( json.dumps(res['aggregations'], indent=4) ) - - - print( json.dumps(res['aggregations']['my-aggregation']['buckets'], indent=4) ) - - - full_uuids = [] - - for bucket in res['aggregations']['my-aggregation']['buckets']: - if '.full' in bucket['key']: - full_uuids.append(bucket['key']) - - print(full_uuids) - - - for uuid in full_uuids: - print(uuid) - - the_query = dict() - the_query['size'] = 10 - the_query['query'] = dict() - the_query['query']['match'] = {'uuid': uuid} - - res = source_es.search(decoded_body['source_index'], '_doc', the_query) - print( len(res['hits']['hits']) ) - - docs_to_index = [ doc['_source'] for doc in res['hits']['hits'] ] - - t1 = time.time() - - # push sample records to the destination index - es_body = '' - header = { "index" : { "_index" : decoded_body['dest_index'], "_type" : "_doc" } } - for doc in docs_to_index: - # try: - # header['index']['_id'] = doc['_id'] #hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() - # del doc['_id'] - # except: - # header['index']['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() - - #header['index']['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() - - doc['editorial-metadata']['isSample'] = True - doc['uuid'] = uuid.replace('.full', '.meta') - - es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc)) - - - rep = destin_es.bulk(body=es_body) - - t2 = time.time() - - if rep['errors'] == False: - channel.basic_ack(delivery_tag = method.delivery_tag) - logging.info("Done in %s seconds." % (t2-t1)) - destin_es.indices.refresh(index=decoded_body['dest_index']) - else: - channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - logging.error(json.dumps(rep, indent=4)) - logging.error("Failed") - - else: - - time.sleep(5) - channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - - return +# def old_callback(channel, method, properties, body): +# +# decoded_body = msgpack.unpackb(body, raw=False) +# print(decoded_body) +# +# reindex_task_url = decoded_body['reindex_task_url'] +# +# res = requests.get(reindex_task_url) +# data = res.json() +# +# print( json.dumps(data, indent=4) ) +# +# if data['completed'] == True: +# print(True) +# +# # delete all the already existing samples +# +# destin_es = Elasticsearch([decoded_body['dest_url']], timeout=60) +# destin_es.indices.refresh(index=decoded_body['dest_index']) +# +# the_query = dict() +# the_query['query'] = dict() +# the_query['query']['term'] = {'editorial-metadata.isSample': True} +# +# res = destin_es.delete_by_query(decoded_body['dest_index'], doc_type='_doc', body=the_query) +# +# print( json.dumps(res) ) +# +# #exit(1) +# +# # get sample records from the ingest index +# source_es = Elasticsearch([decoded_body['source_url']], timeout=60) +# +# the_query = dict() +# the_query['size'] = 0 +# # the_query['query'] = dict() +# # the_query['query']['exists'] = dict() +# # the_query['query']['exists']['field'] = "data-fr.properties.address.addressCountry" +# the_query['aggs'] = dict() +# the_query['aggs']['my-aggregation'] = dict() +# the_query['aggs']['my-aggregation']['terms'] = dict() +# the_query['aggs']['my-aggregation']['terms']['field'] = "uuid" +# the_query['aggs']['my-aggregation']['terms']['size'] = 2000 +# +# +# res = source_es.search(decoded_body['source_index'], '_doc', the_query) +# +# # print( json.dumps(res['aggregations'], indent=4) ) +# +# +# print( json.dumps(res['aggregations']['my-aggregation']['buckets'], indent=4) ) +# +# +# full_uuids = [] +# +# for bucket in res['aggregations']['my-aggregation']['buckets']: +# if '.full' in bucket['key']: +# full_uuids.append(bucket['key']) +# +# print(full_uuids) +# +# +# for uuid in full_uuids: +# print(uuid) +# +# the_query = dict() +# the_query['size'] = 10 +# the_query['query'] = dict() +# the_query['query']['match'] = {'uuid': uuid} +# +# res = source_es.search(decoded_body['source_index'], '_doc', the_query) +# print( len(res['hits']['hits']) ) +# +# docs_to_index = [ doc['_source'] for doc in res['hits']['hits'] ] +# +# t1 = time.time() +# +# # push sample records to the destination index +# es_body = '' +# header = { "index" : { "_index" : decoded_body['dest_index'], "_type" : "_doc" } } +# for doc in docs_to_index: +# # try: +# # header['index']['_id'] = doc['_id'] #hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() +# # del doc['_id'] +# # except: +# # header['index']['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() +# +# #header['index']['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() +# +# doc['editorial-metadata']['isSample'] = True +# doc['uuid'] = uuid.replace('.full', '.meta') +# +# es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc)) +# +# +# rep = destin_es.bulk(body=es_body) +# +# t2 = time.time() +# +# if rep['errors'] == False: +# channel.basic_ack(delivery_tag = method.delivery_tag) +# logging.info("Done in %s seconds." % (t2-t1)) +# destin_es.indices.refresh(index=decoded_body['dest_index']) +# else: +# channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) +# logging.error(json.dumps(rep, indent=4)) +# logging.error("Failed") +# +# else: +# +# time.sleep(5) +# channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) +# +# return def callback(channel, method, properties, body): @@ -182,12 +182,12 @@ def callback(channel, method, properties, body): logging.info("Deleting already existing samples for dataset with slug = %s" % docs_to_index[0]['slug']) # ---------------------- send log ---------------------------- - message = "Deleting already existing samples for dataset with slug =" + str(docs_to_index[0]['slug']) + message = "Deleting already existing samples for dataset with slug = " + str(docs_to_index[0]['slug']) log_message = LogMessage(session_id=cfg['session']['id'], uuid=cfg['session']['current_uuid'], step='sampler', status=message, - uuid_prefix='full', + uuid_prefix='', info=uuid ) @@ -237,7 +237,7 @@ def callback(channel, method, properties, body): uuid=cfg['session']['current_uuid'], step='sampler', status=message, - uuid_prefix='full', + uuid_prefix='', info=uuid ) @@ -271,6 +271,26 @@ def callback(channel, method, properties, body): # # time.sleep(5) # channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + # ---------------------- send log ---------------------------- + message = "done" + log_message = LogMessage(session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='sampler', + status=message, + uuid_prefix='', + info=uuid + ) + + json_body = json.dumps(log_message.__dict__) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=exchange_logs_name, + routing_key=routing_key_logs, + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ return