diff --git a/workers/metadata-processor.py b/workers/metadata-processor.py index 1f967de7d02160898fa09f64899941f99b12c267..8eebf8cc86a7ad712b4ced4cbf367b90db910f84 100644 --- a/workers/metadata-processor.py +++ b/workers/metadata-processor.py @@ -372,7 +372,7 @@ def callback( channel, method, properties, body ): if not wfs_found: meta_version = out_record.copy() # including metadata ONLY - meta_version['uuid'] = out_record['metadata-fr']['geonet:info']['uuid'] + '.meta' + meta_version['uuid'] = out_record['metadata-fr']['geonet:info']['uuid'] + '.full' meta_version['type'] = out_record['metadata-fr']['type'] meta_version['slug'] = the_slug meta_version['last_update'] = last_update diff --git a/workers/reindexer.py b/workers/reindexer.py index 6ddd03bded463b0de3b0cba7e2d029f0ffe71138..0bc029909e2358cd696039ca0e43b6cc0da28aef 100644 --- a/workers/reindexer.py +++ b/workers/reindexer.py @@ -21,7 +21,7 @@ class NotEmptyQueueException(Exception): def create_sampling_task(cfg, channel, uuid): - # here-below we generate a task for the sample generator (full -> meta) + # here-below we generate a task for the sample generator (full -> sample) msg = dict() msg['header'] = dict() msg['header']['cfg'] = cfg @@ -233,16 +233,9 @@ def on_msg_callback(channel, method, properties, body): reindex_task_url = "{0}/_tasks/{1}".format(cfg['reindexer']['destination_url'], rep['task']) logging.info("Created reindex task: {0}".format(reindex_task_url)) - # 3. create sampling task (full -> meta) - if '.full' in uuid: - create_sampling_task(cfg, channel, uuid)#, reindex_task_url) - logging.info("Created sampling task.") - # otherwise, remove the lock - else: - logging.info("Removing lock for dataset with uuid = %s." % uuid.replace('.meta', '')) - unlock( cfg['session']['working_directory'], uuid.replace('.meta', '')) - - + # 3. create sampling task (full -> sample) + create_sampling_task(cfg, channel, uuid)#, reindex_task_url) + logging.info("Created sampling task.") else: channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) diff --git a/workers/sample-generator.py b/workers/sample-generator.py index 5a750c4d61c4dc8c6aa4d1f2186cb3ca93be8b0e..d86d5f3dc64d57cdb6d9cfb64a55d81c826c0e76 100644 --- a/workers/sample-generator.py +++ b/workers/sample-generator.py @@ -14,121 +14,6 @@ from lib.exit_gracefully import exit_gracefully from lib.my_logging import logging from lib.locker import unlock -def old_callback(channel, method, properties, body): - - decoded_body = msgpack.unpackb(body, raw=False) - print(decoded_body) - - reindex_task_url = decoded_body['reindex_task_url'] - - res = requests.get(reindex_task_url) - data = res.json() - - print( json.dumps(data, indent=4) ) - - if data['completed'] == True: - print(True) - - # delete all the already existing samples - - destin_es = Elasticsearch([decoded_body['dest_url']], timeout=60) - destin_es.indices.refresh(index=decoded_body['dest_index']) - - the_query = dict() - the_query['query'] = dict() - the_query['query']['term'] = {'editorial-metadata.isSample': True} - - res = destin_es.delete_by_query(decoded_body['dest_index'], doc_type='_doc', body=the_query) - - print( json.dumps(res) ) - - #exit(1) - - # get sample records from the ingest index - source_es = Elasticsearch([decoded_body['source_url']], timeout=60) - - the_query = dict() - the_query['size'] = 0 - # the_query['query'] = dict() - # the_query['query']['exists'] = dict() - # the_query['query']['exists']['field'] = "data-fr.properties.address.addressCountry" - the_query['aggs'] = dict() - the_query['aggs']['my-aggregation'] = dict() - the_query['aggs']['my-aggregation']['terms'] = dict() - the_query['aggs']['my-aggregation']['terms']['field'] = "uuid" - the_query['aggs']['my-aggregation']['terms']['size'] = 2000 - - - res = source_es.search(decoded_body['source_index'], '_doc', the_query) - - # print( json.dumps(res['aggregations'], indent=4) ) - - - print( json.dumps(res['aggregations']['my-aggregation']['buckets'], indent=4) ) - - - full_uuids = [] - - for bucket in res['aggregations']['my-aggregation']['buckets']: - if '.full' in bucket['key']: - full_uuids.append(bucket['key']) - - print(full_uuids) - - - for uuid in full_uuids: - print(uuid) - - the_query = dict() - the_query['size'] = 10 - the_query['query'] = dict() - the_query['query']['match'] = {'uuid': uuid} - - res = source_es.search(decoded_body['source_index'], '_doc', the_query) - print( len(res['hits']['hits']) ) - - docs_to_index = [ doc['_source'] for doc in res['hits']['hits'] ] - - t1 = time.time() - - # push sample records to the destination index - es_body = '' - header = { "index" : { "_index" : decoded_body['dest_index'], "_type" : "_doc" } } - for doc in docs_to_index: - # try: - # header['index']['_id'] = doc['_id'] #hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() - # del doc['_id'] - # except: - # header['index']['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() - - #header['index']['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest() - - doc['editorial-metadata']['isSample'] = True - doc['uuid'] = uuid.replace('.full', '.meta') - - es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc)) - - - rep = destin_es.bulk(body=es_body) - - t2 = time.time() - - if rep['errors'] == False: - channel.basic_ack(delivery_tag = method.delivery_tag) - logging.info("Done in %s seconds." % (t2-t1)) - destin_es.indices.refresh(index=decoded_body['dest_index']) - else: - channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - logging.error(json.dumps(rep, indent=4)) - logging.error("Failed") - - else: - - time.sleep(5) - channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - - return - def callback(channel, method, properties, body): @@ -169,7 +54,7 @@ def callback(channel, method, properties, body): the_query = dict() the_query['query'] = dict() the_query['query']['term'] = {'editorial-metadata.isSample': True} - the_query['query']['term'] = {'uuid.keyword': uuid.replace('.full', '.meta')} + the_query['query']['term'] = {'uuid.keyword': uuid.replace('.full', '.sample')} logging.info("Deleting already existing samples for dataset with slug = %s" % docs_to_index[0]['slug']) try: @@ -204,7 +89,7 @@ def callback(channel, method, properties, body): for doc in docs_to_index: doc['editorial-metadata']['isSample'] = True - doc['uuid'] = uuid.replace('.full', '.meta') + doc['uuid'] = uuid.replace('.full', '.sample') es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc))