Skip to content
Snippets Groups Projects
Commit 0576d520 authored by Alessandro Cerioni's avatar Alessandro Cerioni
Browse files

Updated tools

parent b1ac3105
No related branches found
No related tags found
No related merge requests found
version: "3.0"
services:
delete-queues:
queues-remover:
build: .
image: data-grandlyon-com-indexer
command: python tools/delete-queues.py --host rabbitmq --exchange download_data_grandlyon_com_index
command: python tools/queues_remover.py --host rabbitmq --exchange download_data_grandlyon_com_index
volumes:
- ${PWD}/config.yaml:/app/config.yaml:ro
delete-indices:
locks-remover:
build: .
image: data-grandlyon-com-indexer
command: python tools/delete-indices.py
command: python tools/locks_remover.py
volumes:
- ${PWD}/config.yaml:/app/config.yaml:ro
- working-directory:/app/output
setup-indices:
delete-indices:
build: .
image: data-grandlyon-com-indexer
command: python tools/setup-indices.py
command: python tools/delete-indices.py
volumes:
- ${PWD}/config.yaml:/app/config.yaml:ro
field-type-detector:
build: .
image: data-grandlyon-com-indexer
command: python tools/field-type-detector.py
command: python tools/field_type_detector.py
volumes:
- ${PWD}/config.yaml:/app/config.yaml:ro
- working-directory:/app/output
......
import os, sys
fileDir = os.path.dirname(os.path.abspath(__file__))
parentDir = os.path.dirname(fileDir)
newPath = os.path.join(parentDir)
sys.path.append(newPath)
from lib.locker import unlock
from lib.my_logging import logging
def main(cfg):
unlock( cfg['session']['working_directory'] )
return
if __name__ == '__main__':
import argparse
from yaml import load, dump
try:
from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
from yaml import Loader, Dumper
parser = argparse.ArgumentParser(description='Locks remover')
parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR'])
args = parser.parse_args()
logging.getLogger().setLevel(args.loglevel)
# read 'n' parse the configuration
with open("config.yaml", "r") as yamlfile:
cfg = load(yamlfile, Loader=Loader)
logging.info("Starting...")
try:
main(cfg)
logging.info('Done!')
except Exception as e:
logging.error(e)
import json
from elasticsearch import Elasticsearch
import os, sys
fileDir = os.path.dirname(os.path.abspath(__file__))
parentDir = os.path.dirname(fileDir)
newPath = os.path.join(parentDir)
sys.path.append(newPath)
from lib.geonetwork_helper import get_metadata_records
from lib.my_logging import logging
def get_uuids_from_geonetwork(cfg):
logging.info('Getting UUIDs from GeoNetwork...')
uuids = set()
for record in get_metadata_records( cfg['geonetwork']['url'],
cfg['geonetwork']['records_per_page'],
username=cfg['geonetwork']['username'],
password=cfg['geonetwork']['password'] ):
uuids.add( record['geonet:info']['uuid'] )
return uuids
def get_uuids_from_elasticsearch(cfg):
logging.info('Getting UUIDs from ES...')
es = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60)
the_query = dict()
the_query['size'] = 0
the_query['aggs'] = dict()
the_query['aggs']['my-aggregation'] = dict()
the_query['aggs']['my-aggregation']['terms'] = dict()
the_query['aggs']['my-aggregation']['terms']['field'] = "uuid.keyword"
the_query['aggs']['my-aggregation']['terms']['size'] = 2000
res = es.search(cfg['reindexer']['destination_index'], '_doc', the_query)
uuids = set()
for bucket in res['aggregations']['my-aggregation']['buckets']:
uuids.add(bucket['key'].split('.')[0])
#print(es_uuids)
return uuids
def main(cfg):
geonetwork_uuids = get_uuids_from_geonetwork(cfg)
elasticsearch_uuids = get_uuids_from_elasticsearch(cfg)
# diff = geonetwork_uuids - elasticsearch_uuids
# if len(diff) > 0:
# logging.info("Found in GeoNetwork only:")
# for el in diff:
# logging.info(el)
diff = elasticsearch_uuids - geonetwork_uuids
if len(diff) > 0:
logging.info("The following UUIDs were found in Elasticsearch only:")
for el in diff:
logging.info(el)
logging.info("Removing spurious datasets from the destination index...")
es = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60)
for uuid_to_remove in diff:
for suffix in ['meta', 'full']:
the_query = dict()
the_query['query'] = dict()
the_query['query']['term'] = {'uuid.keyword': '{0}.{1}'.format(uuid_to_remove, suffix)}
try:
es.delete_by_query(index=cfg['reindexer']['destination_index'], body=the_query)
except Exception as e:
logging.error(e)
logging.info("...done")
return
if __name__ == "__main__":
import argparse
from yaml import load, dump
try:
from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
from yaml import Loader, Dumper
parser = argparse.ArgumentParser(description='Obsolete dataset remover')
parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR'])
args = parser.parse_args()
logging.getLogger().setLevel(args.loglevel)
# read 'n' parse the configuration
with open("config.yaml", "r") as yamlfile:
cfg = load(yamlfile, Loader=Loader)
try:
main(cfg)
logging.info('Done!')
except Exception as e:
logging.error(e)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment