Skip to content
Snippets Groups Projects
Commit 3f4f51ad authored by Alessandro Cerioni's avatar Alessandro Cerioni
Browse files

Not yet tested first version in which all the datasets are at first indexed as...

Not yet tested first version in which all the datasets are at first indexed as "full versions" (GeoSource's UUID concatenated with ".full"), then sampled (".sample").
parent 04eb73a7
Branches full-and-sample-docs
Tags
No related merge requests found
...@@ -372,7 +372,7 @@ def callback( channel, method, properties, body ): ...@@ -372,7 +372,7 @@ def callback( channel, method, properties, body ):
if not wfs_found: if not wfs_found:
meta_version = out_record.copy() # including metadata ONLY meta_version = out_record.copy() # including metadata ONLY
meta_version['uuid'] = out_record['metadata-fr']['geonet:info']['uuid'] + '.meta' meta_version['uuid'] = out_record['metadata-fr']['geonet:info']['uuid'] + '.full'
meta_version['type'] = out_record['metadata-fr']['type'] meta_version['type'] = out_record['metadata-fr']['type']
meta_version['slug'] = the_slug meta_version['slug'] = the_slug
meta_version['last_update'] = last_update meta_version['last_update'] = last_update
......
...@@ -21,7 +21,7 @@ class NotEmptyQueueException(Exception): ...@@ -21,7 +21,7 @@ class NotEmptyQueueException(Exception):
def create_sampling_task(cfg, channel, uuid): def create_sampling_task(cfg, channel, uuid):
# here-below we generate a task for the sample generator (full -> meta) # here-below we generate a task for the sample generator (full -> sample)
msg = dict() msg = dict()
msg['header'] = dict() msg['header'] = dict()
msg['header']['cfg'] = cfg msg['header']['cfg'] = cfg
...@@ -233,16 +233,9 @@ def on_msg_callback(channel, method, properties, body): ...@@ -233,16 +233,9 @@ def on_msg_callback(channel, method, properties, body):
reindex_task_url = "{0}/_tasks/{1}".format(cfg['reindexer']['destination_url'], rep['task']) reindex_task_url = "{0}/_tasks/{1}".format(cfg['reindexer']['destination_url'], rep['task'])
logging.info("Created reindex task: {0}".format(reindex_task_url)) logging.info("Created reindex task: {0}".format(reindex_task_url))
# 3. create sampling task (full -> meta) # 3. create sampling task (full -> sample)
if '.full' in uuid: create_sampling_task(cfg, channel, uuid)#, reindex_task_url)
create_sampling_task(cfg, channel, uuid)#, reindex_task_url) logging.info("Created sampling task.")
logging.info("Created sampling task.")
# otherwise, remove the lock
else:
logging.info("Removing lock for dataset with uuid = %s." % uuid.replace('.meta', ''))
unlock( cfg['session']['working_directory'], uuid.replace('.meta', ''))
else: else:
channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
......
...@@ -14,121 +14,6 @@ from lib.exit_gracefully import exit_gracefully ...@@ -14,121 +14,6 @@ from lib.exit_gracefully import exit_gracefully
from lib.my_logging import logging from lib.my_logging import logging
from lib.locker import unlock from lib.locker import unlock
def old_callback(channel, method, properties, body):
decoded_body = msgpack.unpackb(body, raw=False)
print(decoded_body)
reindex_task_url = decoded_body['reindex_task_url']
res = requests.get(reindex_task_url)
data = res.json()
print( json.dumps(data, indent=4) )
if data['completed'] == True:
print(True)
# delete all the already existing samples
destin_es = Elasticsearch([decoded_body['dest_url']], timeout=60)
destin_es.indices.refresh(index=decoded_body['dest_index'])
the_query = dict()
the_query['query'] = dict()
the_query['query']['term'] = {'editorial-metadata.isSample': True}
res = destin_es.delete_by_query(decoded_body['dest_index'], doc_type='_doc', body=the_query)
print( json.dumps(res) )
#exit(1)
# get sample records from the ingest index
source_es = Elasticsearch([decoded_body['source_url']], timeout=60)
the_query = dict()
the_query['size'] = 0
# the_query['query'] = dict()
# the_query['query']['exists'] = dict()
# the_query['query']['exists']['field'] = "data-fr.properties.address.addressCountry"
the_query['aggs'] = dict()
the_query['aggs']['my-aggregation'] = dict()
the_query['aggs']['my-aggregation']['terms'] = dict()
the_query['aggs']['my-aggregation']['terms']['field'] = "uuid"
the_query['aggs']['my-aggregation']['terms']['size'] = 2000
res = source_es.search(decoded_body['source_index'], '_doc', the_query)
# print( json.dumps(res['aggregations'], indent=4) )
print( json.dumps(res['aggregations']['my-aggregation']['buckets'], indent=4) )
full_uuids = []
for bucket in res['aggregations']['my-aggregation']['buckets']:
if '.full' in bucket['key']:
full_uuids.append(bucket['key'])
print(full_uuids)
for uuid in full_uuids:
print(uuid)
the_query = dict()
the_query['size'] = 10
the_query['query'] = dict()
the_query['query']['match'] = {'uuid': uuid}
res = source_es.search(decoded_body['source_index'], '_doc', the_query)
print( len(res['hits']['hits']) )
docs_to_index = [ doc['_source'] for doc in res['hits']['hits'] ]
t1 = time.time()
# push sample records to the destination index
es_body = ''
header = { "index" : { "_index" : decoded_body['dest_index'], "_type" : "_doc" } }
for doc in docs_to_index:
# try:
# header['index']['_id'] = doc['_id'] #hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest()
# del doc['_id']
# except:
# header['index']['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest()
#header['index']['_id'] = hashlib.md5( json.dumps(doc, sort_keys=True).encode("utf-8") ).hexdigest()
doc['editorial-metadata']['isSample'] = True
doc['uuid'] = uuid.replace('.full', '.meta')
es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc))
rep = destin_es.bulk(body=es_body)
t2 = time.time()
if rep['errors'] == False:
channel.basic_ack(delivery_tag = method.delivery_tag)
logging.info("Done in %s seconds." % (t2-t1))
destin_es.indices.refresh(index=decoded_body['dest_index'])
else:
channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
logging.error(json.dumps(rep, indent=4))
logging.error("Failed")
else:
time.sleep(5)
channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
return
def callback(channel, method, properties, body): def callback(channel, method, properties, body):
...@@ -169,7 +54,7 @@ def callback(channel, method, properties, body): ...@@ -169,7 +54,7 @@ def callback(channel, method, properties, body):
the_query = dict() the_query = dict()
the_query['query'] = dict() the_query['query'] = dict()
the_query['query']['term'] = {'editorial-metadata.isSample': True} the_query['query']['term'] = {'editorial-metadata.isSample': True}
the_query['query']['term'] = {'uuid.keyword': uuid.replace('.full', '.meta')} the_query['query']['term'] = {'uuid.keyword': uuid.replace('.full', '.sample')}
logging.info("Deleting already existing samples for dataset with slug = %s" % docs_to_index[0]['slug']) logging.info("Deleting already existing samples for dataset with slug = %s" % docs_to_index[0]['slug'])
try: try:
...@@ -204,7 +89,7 @@ def callback(channel, method, properties, body): ...@@ -204,7 +89,7 @@ def callback(channel, method, properties, body):
for doc in docs_to_index: for doc in docs_to_index:
doc['editorial-metadata']['isSample'] = True doc['editorial-metadata']['isSample'] = True
doc['uuid'] = uuid.replace('.full', '.meta') doc['uuid'] = uuid.replace('.full', '.sample')
es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc)) es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment