Skip to content
Snippets Groups Projects
Commit 7b7666b1 authored by Alessandro Cerioni's avatar Alessandro Cerioni
Browse files

Adding lock in order to forbid concurrent indexations of the same dataset

parent 5d1ab65f
No related branches found
No related tags found
No related merge requests found
......@@ -33,10 +33,10 @@ services:
main:
build: .
image: data-grandlyon-com-indexer
command: python main.py --host rabbitmq --exchange download_data_grandlyon_com_index
command: python main.py --host rabbitmq --exchange download_data_grandlyon_com_index --loglevel DEBUG
volumes:
- ./config.yaml:/app/config.yaml:ro
#- working-directory:/app/output
- working-directory:/app/output
volumes:
working-directory:
......@@ -48,16 +48,16 @@ services:
build: .
image: data-grandlyon-com-indexer
command: python workers/reindexer.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue reindex_tasks --loglevel DEBUG
# volumes:
# - ./config.yaml:/app/config.yaml:ro
volumes:
- working-directory:/app/output
restart: unless-stopped
sampler:
build: .
image: data-grandlyon-com-indexer
command: python workers/sample-generator.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue sampling_tasks --loglevel DEBUG
# volumes:
# - ./config.yaml:/app/config.yaml:ro
volumes:
- working-directory:/app/output
restart: unless-stopped
volumes:
......
import os
import json
import datetime
from .my_logging import logging
#import logging
fileDir = os.path.dirname(os.path.abspath(__file__))
parentDir = os.path.dirname(fileDir)
def lock( working_directory, uuid ):
now = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
data = dict()
data['created_at'] = now
# if not os.path.isdir( working_directory ):
# os.mkdir(working_directory)
with open( os.path.join(parentDir, working_directory, '%s.lock' % uuid), 'w') as fp:
json.dump(data, fp)
return
def is_locked( working_directory, uuid ):
f = os.path.join(parentDir, working_directory, '%s.lock' % uuid)
return os.path.isfile(f)
def unlock( working_directory, uuid ):
f = os.path.join(parentDir, working_directory, '%s.lock' % uuid)
if os.path.isfile(f):
os.remove( f )
else:
logging.error("Lock file %s not found!" % f)
return
if __name__ == "__main__":
import uuid
import time
the_uuid = uuid.uuid4()
lock('.', the_uuid)
if is_locked('.', the_uuid):
print('The file %s.lock will be visible for the next 5 seconds...' % the_uuid)
time.sleep(5)
print('Removing the lock...')
unlock('.', the_uuid)
print('...done.')
else:
print('Something went wrong when trying to lock...')
......@@ -11,6 +11,7 @@ from collections import OrderedDict
from lib.geonetwork_helper import get_metadata_records
from lib.exit_gracefully import exit_gracefully
from lib.my_logging import logging
from lib.locker import lock, unlock, is_locked
def setup_indices(cfg):
......@@ -218,8 +219,16 @@ def main(cfg):
the_filter=uuids_to_filter_out,
username=username, password=password ):
the_uuid = record['geonet:info']['uuid']
if is_locked( cfg['session']['working_directory'], the_uuid ):
logging.info("Dataset with uuid = %s is already being indexed: skipping." % the_uuid)
continue
else:
lock(cfg['session']['working_directory'], the_uuid)
# delete_dataset_from_indices(cfg, record['geonet:info']['uuid'])
delete_dataset_from_dest_index(cfg, record['geonet:info']['uuid'])
delete_dataset_from_dest_index(cfg, the_uuid)
send_record_to_the_metadata_processor(cfg, record)
else:
......@@ -231,6 +240,14 @@ def main(cfg):
the_filter=uuids_to_filter_out,
username=username, password=password ):
the_uuid = record['geonet:info']['uuid']
if is_locked( cfg['session']['working_directory'], the_uuid ):
logging.info("Dataset with uuid = %s is already being indexed: skipping." % the_uuid)
continue
else:
lock(cfg['session']['working_directory'], the_uuid)
# delete_dataset_from_indices(cfg, record['geonet:info']['uuid'])
delete_dataset_from_dest_index(cfg, record['geonet:info']['uuid'])
send_record_to_the_metadata_processor(cfg, record)
......
......@@ -11,6 +11,7 @@ newPath = os.path.join(parentDir)
sys.path.append(newPath)
from lib.my_logging import logging
from lib.exit_gracefully import exit_gracefully
from lib.locker import unlock
class NotEmptyQueueException(Exception):
pass
......@@ -192,6 +193,10 @@ def on_msg_callback(channel, method, properties, body):
if '.full' in uuid:
create_sampling_task(cfg, channel, uuid)#, reindex_task_url)
logging.info("Created sampling task.")
# otherwise, remove the lock
else:
logging.info("Removing lock for dataset with uuid = %s." % uuid.replace('.meta', ''))
unlock( cfg['session']['working_directory'], uuid.replace('.meta', ''))
......
......@@ -12,7 +12,7 @@ newPath = os.path.join(parentDir)
sys.path.append(newPath)
from lib.exit_gracefully import exit_gracefully
from lib.my_logging import logging
from lib.locker import unlock
def old_callback(channel, method, properties, body):
......@@ -209,6 +209,8 @@ def callback(channel, method, properties, body):
channel.basic_ack(delivery_tag = method.delivery_tag)
logging.info("Done in %s seconds." % (t2-t1))
destin_es.indices.refresh(index=cfg['reindexer']['destination_index'])
logging.info("Removing lock for dataset with uuid = %s." % uuid.replace('.full', ''))
unlock( cfg['session']['working_directory'], uuid.replace('.full', '') )
else:
channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
logging.error(json.dumps(rep, indent=4))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment