diff --git a/workers/doc-enricher-oo.py b/workers/doc-enricher-oo.py deleted file mode 100644 index 0dc47ce308366e33248f9bb913f0529f2e17b7f3..0000000000000000000000000000000000000000 --- a/workers/doc-enricher-oo.py +++ /dev/null @@ -1,402 +0,0 @@ -import pika -import msgpack -import requests -import json -import datetime -import os, sys -from sqlalchemy.exc import NoSuchTableError - -fileDir = os.path.dirname(os.path.abspath(__file__)) -parentDir = os.path.dirname(fileDir) -newPath = os.path.join(parentDir) -sys.path.append(newPath) -from lib.exit_gracefully import exit_gracefully -from lib.my_logging import logging -from lib.postgis_helper import Remote -from lib.serializers import encode_datetime - -from lib.rabbit_session import RabbitSession -from lib.log_message import LogMessage - - -class DocEnricher: - - def __init__(self, cfg): - self.cfg = cfg - self.rabbit = None - - def truncate(self, n, decimals=0): - multiplier = 10 ** decimals - return int(n * multiplier) / multiplier - - def get_entries_from_postgis(self, link, cfg, no_features_per_page=1000): - - dbname = link['url'].split('/')[-1] - schema, table_name = link['name'].split('.') - - logging.info('Getting data from database %s...' % dbname) - logging.info('Establishing a database connection...') - pg = Remote(hostname=cfg['host'], dbname=dbname, username=cfg['username'], password=cfg['password']) - logging.info('Done.') - - try: - table = pg.get_table(table_name, schema=schema) - except NoSuchTableError: - logging.debug('Table %s in schema % s not found :-(' % (table_name, schema)) - return - - count = pg.count_entries(table) - - no_pages = count//no_features_per_page+1 - logging.info('Getting %i entries in %i pages from table %s.%s...'% (count, no_pages, schema, table_name)) - - feature_page = [] # we accumulate entries in this sort of buffer - cnt = 0 - for entry in pg.get_entries(table): - feature_page.append(entry) - if len(feature_page) == no_features_per_page: - cnt += 1 - logging.debug('Yielding page %i/%i, with %i features' % (cnt, no_pages, len(feature_page))) - yield (cnt/no_pages, count, feature_page) - feature_page = [] - - logging.debug('Yielding last page with %i features' % len(feature_page)) - yield (1, count, feature_page) # this will be the last feature_page - - return - - - def get_wfs( link, credentials, offset=0, no_features_per_page=1000 ): - - root_url = link['url'] - print(offset, no_features_per_page) - - params = {} - params['version'] = '2.0.0' - params['service'] = 'WFS' - params['outputFormat'] = 'geojson' - params['request'] = 'GetFeature' - params['maxFeatures'] = no_features_per_page - params['typeName'] = link['name'] - # params['sortBy'] = 'gid' # TODO: is it safe not to force any sortBy? - #params['startindex'] = 11 - params['SRSNAME'] = 'epsg:4326' - - #startindex = 0 - cnt = offset / no_features_per_page + 1 - - - logging.info('WFS page %i; offset = %i' % (cnt, offset)) - params['startindex'] = offset #0 + cnt*no_features_per_page - #params['to'] = params['from'] + no_records_per_page - 1 - - with_credentials = False - for domain in credentials: - if domain in link['url'] and credentials[domain]['username'] != None and credentials[domain]['password']: - logging.info('Found a valid credential.') - with_credentials = True - username = credentials[domain]['username'] - password = credentials[domain]['password'] - break - - if with_credentials: - res = requests.get(root_url, params=params, auth=(username, password)) - else: - res = requests.get(root_url, params=params) - - logging.debug(res.url) - - try: - # print(res.status_code) - # print(res.text) - # print(res.json()) - features = res.json()['features'] - #processed_features = process_features(features) - logging.debug(len(features)) - - return features - - # if len(features) < no_features_per_page: - # break # it means that we have reached the last page - # - # cnt += 1 - - - except: #json.decoder.JSONDecodeError: # it means that we the WFS request failed, for instance because of insufficient right access - #logging.error("Failed WFS request: %s" % res.url) - logging.error("Failed WFS request: %s" % res.url) - - #yield None - - #raise Exception("Failed WFS request: %s" % res.url) - - return None - - #print() - - def old_enrich_docs( self, channel, method, properties, body, **kwargs ): - - decoded_body = msgpack.unpackb(body, raw=False) - - wfs_info = decoded_body['header']['wfs_info'] - offset = decoded_body['header']['offset'] - session_id = decoded_body['header']['session_id'] - dest_index = decoded_body['header']['dest_index'] - - logging.info('Enriching dataset named: %s' % decoded_body['body']['metadata-fr']['title']) - - feature_page = self.get_wfs(wfs_info, kwargs['credentials'], offset, kwargs['features_per_page']) - - # we implement pagination by letting this program creating tasks for itself / its siblings - if feature_page != None and len(feature_page) == kwargs['features_per_page']: # at least another page is needed - msg = {'header': {'wfs_info': wfs_info, 'offset': offset+kwargs['features_per_page'], 'session_id': session_id, 'dest_index': dest_index}, 'body': decoded_body['body']} - the_body = msgpack.packb(msg, use_bin_type=True) - - channel.basic_publish( exchange=kwargs['exchange'], - routing_key=kwargs['docs_to_enrich_rk'], - body=the_body, - properties=pika.BasicProperties(delivery_mode = 2) - ) - - - if feature_page != None: - - # try: - # #for feature_page in feature_pages: - # #print(feature_page[0]['properties']['nom_reduit']) - logging.info('Sending feature page of len = %i to RabbitMQ and MongoDB...' % len(feature_page)) - doc_page = [{**decoded_body['body'], 'data-fr': feature} for feature in feature_page] - - msg = {'header': {'metadata': decoded_body['body'], 'session_id': session_id, 'dest_index': dest_index}, 'body': doc_page} - the_body = msgpack.packb(msg, use_bin_type=True) - - - channel.basic_publish( exchange=kwargs['exchange'], - routing_key=kwargs['doc_pages_to_store_in_mongo_rk'], - body=the_body, - properties=pika.BasicProperties(delivery_mode = 2) - ) - - - channel.basic_publish( exchange=kwargs['exchange'], - routing_key=kwargs['doc_pages_to_process_rk'], - body=the_body, - properties=pika.BasicProperties(delivery_mode = 2) - ) - - logging.info('...done!') - - # except TypeError: # it means that getWFS returned None - # pass - # # - # #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - - - - channel.basic_ack(delivery_tag = method.delivery_tag) - #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - - return #out_docs - - - def enrich_docs( self, channel, method, properties, body ): - - - decoded_body = msgpack.unpackb(body, raw=False) - - wfs_info = decoded_body['header']['wfs_info'] - cfg = decoded_body['header']['cfg'] - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], - # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], - step='doc-enricher', - status=decoded_body['body']['metadata-fr']['title'], - uuid_prefix='meta', - info='starting' - ) - self.rabbit.publish_log(log_message=log_message.__dict__) - # ------------------------------------------------------------------------ - - # # initialize RabbitMQ queues - # exchange = cfg['rabbitmq']['exchange'] - # doc_pages_to_process_qn = cfg['rabbitmq']['queue_name_5'] - # doc_pages_to_process_rk = cfg['rabbitmq']['routing_key_5'] - # - # del cfg['rabbitmq']['queue_name_5'] - # del cfg['rabbitmq']['routing_key_5'] - # - # channel.exchange_declare(exchange=exchange, exchange_type='direct') - # - # channel.queue_declare(queue=doc_pages_to_process_qn, durable=True) - # channel.queue_bind(exchange=exchange, queue=doc_pages_to_process_qn, routing_key=doc_pages_to_process_rk) - - logging.info('Enriching dataset named: %s' % decoded_body['body']['metadata-fr']['title']) - - postgis_cfg = cfg['postgis'] - del cfg['postgis'] # <- as this information is no longer needed - - for progress_ratio, count, feature_page in self.get_entries_from_postgis(wfs_info, postgis_cfg): - - last_update = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") - - if len(feature_page) == 0: - count = 1 # at least we have a metadata-only document! count cannot be left = 0, otherwise the reindexer would never start - logging.debug('Empty page!') - doc_page = [{**decoded_body['body'], 'last_update': last_update}] - else: - doc_page = [{**decoded_body['body'], 'last_update': last_update, 'data-fr': feature} for feature in feature_page] - - - logging.info('[%6.2f%%] Sending %i docs to RabbitMQ for dataset %s...' % (progress_ratio*100, len(doc_page), doc_page[0]['slug'])) - progress_rounded = round(progress_ratio*100) - if progress_rounded % 10: - info_message = '[' +str(progress_rounded)+'%] Sending ' + str(len(doc_page)) + 'docs to RabbitMQ for dataset ' + doc_page[0]['slug'] + '...' - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], - # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], - step='doc-enricher', - status=decoded_body['body']['metadata-fr']['title'], - uuid_prefix='meta', - info=info_message - ) - self.rabbit.publish_log(log_message=log_message.__dict__) - # ------------------------------------------------------------------------ - msg = dict() - msg['header'] = dict() - msg['header']['cfg'] = cfg - msg['header']['progress_ratio'] = progress_ratio - msg['header']['count'] = count - msg['body'] = doc_page - - the_task_body = msgpack.packb(msg, use_bin_type=True, default=encode_datetime) - - # channel.basic_publish( exchange=exchange, - # routing_key=doc_pages_to_process_rk, - # body=the_body, - # properties=pika.BasicProperties(delivery_mode = 2) - # ) - # ------------------------------send-task-------------------------------------- - - # initialize RabbitMQ queues - exchange = cfg['rabbitmq']['exchange'] - doc_pages_to_process_qn = cfg['rabbitmq']['queue_name_5'] - doc_pages_to_process_rk = cfg['rabbitmq']['routing_key_5'] - - - - self.rabbit.publish_task(the_body=the_task_body, - exchange=exchange, - routing_key=doc_pages_to_process_rk, - queue_name=doc_pages_to_process_qn) - - # ------------------------------------------------------------------------- - - #logging.info('...done!') - - # except TypeError: # it means that getWFS returned None - # pass - # # - # #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - - # channel.basic_ack(delivery_tag = method.delivery_tag) - #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], - # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], - step='doc-enricher', - status='Terminated.', - uuid_prefix='meta', - info='no info' - ) - self.rabbit.publish_log(log_message=log_message.__dict__) - # ------------------------------------------------------------------------ - del cfg['rabbitmq']['queue_name_5'] - del cfg['rabbitmq']['routing_key_5'] - return #out_docs - - def main(self): - - with RabbitSession(self.cfg) as self.rabbit: - # ------------------------------------------------------------ - docs_to_enrich_qn = cfg['rabbitmq_queue'] - self.rabbit.consume_queue_and_launch_specific_method(specific_method=self.enrich_docs, - specific_queue=docs_to_enrich_qn) - # <-- the rabbit connexion is automatically closed here - - - #from lib.close_connection import on_timeout - - # connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'], port=cfg['rabbitmq_port'])) - # #timeout = 5 - # #connection.add_timeout(timeout, on_timeout(connection)) - # - # channel = connection.channel() - # # exchange = cfg['rabbitmq_exchange'] - # # the queue this program will consume messages from: - # docs_to_enrich_qn = cfg['rabbitmq_queue'] - # - # channel.basic_qos(prefetch_count=1) - # channel.basic_consume(on_message_callback=lambda ch, method, properties, body: - # enrich_docs(ch, method, properties, body), - # #doc_pages_to_store_in_mongo_rk=doc_pages_to_store_in_mongo_rk, - # #docs_to_enrich_rk=docs_to_enrich_rk, - # #doc_pages_to_process_rk=doc_pages_to_process_rk, - # #features_per_page=cfg['wfs']['features_per_page'], - # #postgis_cfg=cfg['postgis']), - # queue=docs_to_enrich_qn)#, no_ack=True) - # channel.start_consuming() - # connection.close() - - -if __name__ == '__main__': - - - import yaml - import time - import signal - import argparse - - signal.signal(signal.SIGINT, exit_gracefully) - - parser = argparse.ArgumentParser(description='Document enricher') - parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, default='rabbitmq') - parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672) - parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, - default='download_data_grandlyon_com_index') - parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, default='doc_pages_to_enrich') - parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) - - args = parser.parse_args() - - cfg = dict() - cfg['rabbitmq'] = dict() - cfg['rabbitmq_host'] = args.host - cfg['rabbitmq_port'] = args.port - cfg['rabbitmq_exchange'] = args.exchange - cfg['rabbitmq_queue'] = args.queue - cfg['rabbitmq']['user'] = 'admin' - cfg['rabbitmq']['password'] = 'admin' - cfg['rabbitmq']['queue_logs_name'] = 'session_logs' - cfg['rabbitmq']['routing_key_logs'] = 'scripts_log_key' - cfg['rabbitmq']['exchange_logs_name'] = 'download_data_grandlyon_com_logs' - - logging.getLogger().setLevel(args.loglevel) - logging.info('Starting...') - - while True: - try: - DocEnricher(cfg).main() - except pika.exceptions.ChannelClosed: - logging.info('Waiting for tasks...') - time.sleep(5) - except pika.exceptions.AMQPConnectionError: - logging.info('Waiting for RabbitMQ to be reachable...') - time.sleep(5) - except Exception as e: - logging.error(e) - print('[xxxx] Exception', e) - time.sleep(5) - exit(1) diff --git a/workers/doc-processor-oo.py b/workers/doc-processor-oo.py deleted file mode 100644 index 8c8b27a3301becf36f853a2df809061ab8b139db..0000000000000000000000000000000000000000 --- a/workers/doc-processor-oo.py +++ /dev/null @@ -1,254 +0,0 @@ -import pika -import msgpack -import datetime -import re -import json -import os, sys - -fileDir = os.path.dirname(os.path.abspath(__file__)) -parentDir = os.path.dirname(fileDir) -newPath = os.path.join(parentDir) -sys.path.append(newPath) -from lib.flatten_utils import flatten_json, unflatten_json -from lib.type_utils import convert_to_datetime, convert_to_float, convert_to_int, convert_to_str, convert_to_boolean -from lib.exit_gracefully import exit_gracefully -from lib.my_logging import logging -from lib.serializers import decode_datetime - -from lib.rabbit_session import RabbitSession -from lib.log_message import LogMessage - - -class DocProcessor: - - def __init__(self, cfg): - self.cfg = cfg - self.rabbit = None - - def fix_field_types(self, in_docs, out_types ): - - out_docs = [] - - for in_doc in in_docs: - - # metadata = in_doc['metadata-fr'] - # data = in_doc['data-fr'] - - if 'data-fr' not in in_doc: - out_docs.append(in_doc) - continue - # - in_flattened_properties = flatten_json(in_doc['data-fr']['properties']) - - out_flattened_properties = in_flattened_properties.copy() - - for prop in in_flattened_properties.keys(): - - # remove null values - the_value = in_flattened_properties[prop] - if the_value is None or (type(the_value) is str and (the_value.strip() == '' or the_value.strip() == '-' or the_value.strip() == 'None')): - del out_flattened_properties[prop] - continue - - # LOOKUP, ex.: the type of a field named "thefield.12.thesubfield" can be found in the catalog by looking for "thefield.0.thesubfield" - lookup_key = re.sub(r'\.\d+', '.0', prop) - - if out_types[lookup_key] == 'str': - out_flattened_properties[prop] = convert_to_str(in_flattened_properties[prop]) - elif out_types[lookup_key] == 'int': - out_flattened_properties[prop] = convert_to_int(in_flattened_properties[prop]) - elif out_types[lookup_key] == 'float': - out_flattened_properties[prop] = convert_to_float(in_flattened_properties[prop]) - elif out_types[lookup_key] in ['date', 'datetime']: - out_flattened_properties[prop] = convert_to_datetime(in_flattened_properties[prop]).strftime('%Y-%m-%dT%H:%M:%SZ') - elif out_types[lookup_key] == 'bool': - out_flattened_properties[prop] = convert_to_boolean(in_flattened_properties[prop]) - else: - logging.critical('type %s not supported', out_types[prop]) - sys.exit(1) - - # pprint - out_doc = in_doc.copy() - out_doc['data-fr']['properties'] = unflatten_json(out_flattened_properties) - - # amending addresses which are not in the schema.org format; we use out_doc in order to avoid reintroducing null values! - if 'address' in out_doc['data-fr']['properties'].keys() and type(out_doc['data-fr']['properties']['address']) is str: - the_street_address = in_doc['data-fr']['properties']['address'] - out_doc['data-fr']['properties']['address'] = {'streetAddress': the_street_address} - logging.debug(out_doc['data-fr']['properties']['address']) - - out_docs.append(out_doc) - - return out_docs - - def process_docs(self, channel, method, properties, body): - print('inside process docs') - decoded_body = msgpack.unpackb(body, raw=False, object_hook=decode_datetime) - - - cfg = decoded_body['header']['cfg'] - progress_ratio = decoded_body['header']['progress_ratio'] - count = decoded_body['header']['count'] - docs = decoded_body['body'] - - filename = os.path.join( parentDir, cfg['session']['working_directory'], 'field_types.json' ) - - with open(filename, 'r') as fp: - field_types = json.load(fp) - - exchange = cfg['rabbitmq']['exchange'] - # the queue this program will be writing to - docs_to_index_qn = cfg['rabbitmq']['queue_name_2'] - docs_to_index_rk = cfg['rabbitmq']['routing_key_2'] - print('docs_to_index_qn', docs_to_index_qn) - - del cfg['rabbitmq']['queue_name_2'] - del cfg['rabbitmq']['routing_key_2'] - - # channel.exchange_declare(exchange=exchange, exchange_type='direct') - # channel.queue_declare(queue=docs_to_index_qn, durable=True) - # channel.queue_bind(exchange=exchange, queue=docs_to_index_qn, routing_key=docs_to_index_rk) - - logging.info('[%6.2f%%] Processing %i docs for dataset %s...' % (progress_ratio*100, len(docs), docs[0]['slug'])) - - progress_rounded = round(progress_ratio * 100) - if progress_rounded % 10: - info_message = '[' + str(progress_rounded) + '%] Sending ' + str( - len(docs)) + 'docs to RabbitMQ for dataset ' + docs[0]['slug'] + '...' - print('info_message:', info_message) - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], - # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], - step='doc-processor', - status=decoded_body['body']['metadata-fr']['title'], - uuid_prefix='meta', - info=info_message - ) - self.rabbit.publish_log(log_message=log_message.__dict__) - # ------------------------------------------------------------------------ - - docs_to_index = self.fix_field_types( docs, field_types ) - - msg = dict() - msg['header'] = dict() - msg['header']['cfg'] = cfg - msg['header']['progress_ratio'] = progress_ratio - msg['header']['count'] = count - msg['body'] = docs_to_index - - the_task_body = msgpack.packb(msg, use_bin_type=True) - # ------------------------send task----------------------------------- - self.rabbit.publish_task(the_body=the_task_body, - exchange=exchange, - routing_key=docs_to_index_rk, - queue_name=docs_to_index_qn) - # --------------------------------------------------------------------- - - - # channel.basic_publish( exchange=exchange, - # routing_key=docs_to_index_rk, - # body=the_body, - # properties=pika.BasicProperties(delivery_mode = 2) - # ) - - logging.info('...done!') - # channel.basic_ack(delivery_tag = method.delivery_tag) - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], - # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], - step='doc-processor', - status='Terminated', - uuid_prefix='meta', - info='no info' - ) - self.rabbit.publish_log(log_message=log_message.__dict__) - # ------------------------------------------------------------ - return - - def main(self): - with RabbitSession(self.cfg) as self.rabbit: - # ------------------------------------------------------------ - docs_to_enrich_qn = cfg['rabbitmq_queue'] - self.rabbit.consume_queue_and_launch_specific_method(specific_method=self.process_docs, - specific_queue=docs_to_enrich_qn) - - - # # from lib.close_connection import on_timeout - # - # connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'],port=cfg['rabbitmq_port'])) - # # timeout = 5 - # # connection.add_timeout(timeout, on_timeout(connection)) - # - # channel = connection.channel() - # # exchange = cfg['rabbitmq_exchange'] - # - # # the queue this program will be consuming messages from - # doc_pages_to_process_qn = cfg['rabbitmq_queue'] - # - # channel.basic_qos(prefetch_count=1) - # channel.basic_consume(on_message_callback=lambda ch, method, properties, body: self.process_docs(ch, method, properties, body), - # # field_types=field_types, - # # exchange=exchange), - # # docs_to_index_qn=docs_to_index_qn, - # # docs_to_index_rk=docs_to_index_rk), - # queue=doc_pages_to_process_qn)# , no_ack=True) - # channel.start_consuming() - # - # connection.close() - - -if __name__ == '__main__': - - import time - import signal - import yaml - import argparse - - signal.signal(signal.SIGINT, exit_gracefully) - - parser = argparse.ArgumentParser(description='Document processor') - # parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, default='localhost') - parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, default='rabbitmq') - - parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672) - parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, - default='download_data_grandlyon_com_index') - parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, default='doc_pages_to_process') - parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, - choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) - - args = parser.parse_args() - - cfg = dict() - cfg['rabbitmq'] = dict() - cfg['rabbitmq_host'] = args.host - cfg['rabbitmq_port'] = args.port - cfg['rabbitmq_exchange'] = args.exchange - cfg['rabbitmq_queue'] = args.queue - cfg['rabbitmq']['user'] = 'admin' - cfg['rabbitmq']['password'] = 'admin' - cfg['rabbitmq']['queue_logs_name'] = 'session_logs' - cfg['rabbitmq']['routing_key_logs'] = 'scripts_log_key' - cfg['rabbitmq']['exchange_logs_name'] = 'download_data_grandlyon_com_logs' - - logging.getLogger().setLevel(args.loglevel) - logging.info('Starting...') - - while True: - try: - DocProcessor(cfg).main() - except FileNotFoundError as e: - logging.error('The field types file is not ready yet. Retrying in 5 seconds...') - time.sleep(5) - except pika.exceptions.ChannelClosed: - logging.info("Waiting for tasks...") - time.sleep(5) - except pika.exceptions.AMQPConnectionError: - logging.info('Waiting for RabbitMQ to be reachable...') - time.sleep(5) - except Exception as e: - logging.error(e) - time.sleep(5) - exit(1) diff --git a/workers/reindexer-oo.py b/workers/reindexer-oo.py deleted file mode 100644 index 3e24a39f3e4df4482a53f9abacd5a41c504ba1d0..0000000000000000000000000000000000000000 --- a/workers/reindexer-oo.py +++ /dev/null @@ -1,246 +0,0 @@ -import time -import json -import msgpack -import pika -import os, sys -from elasticsearch import Elasticsearch, NotFoundError - -fileDir = os.path.dirname(os.path.abspath(__file__)) -parentDir = os.path.dirname(fileDir) -newPath = os.path.join(parentDir) -sys.path.append(newPath) -from lib.my_logging import logging -from lib.exit_gracefully import exit_gracefully -from lib.locker import unlock - -from lib.rabbit_session import RabbitSession -from lib.log_message import LogMessage - -class NotEmptyQueueException(Exception): - pass - - -class Reindexer : - - def __init__(self, cfg): - self.cfg = cfg - self.rabbit = None - - def create_sampling_task(self, cfg, channel, uuid): - - # here-below we generate a task for the sample generator (full -> meta) - msg = dict() - msg['header'] = dict() - msg['header']['cfg'] = cfg - #msg['header']['reindex_task_url'] = reindex_task_url - msg['body'] = uuid - - the_task_body = msgpack.packb(msg, use_bin_type=True) - - exchange = cfg['rabbitmq']['exchange'] - - queue_name = cfg['rabbitmq']['queue_name_6'] - routing_key = cfg['rabbitmq']['routing_key_6'] - - # ------------------------send task----------------------------------- - self.rabbit.publish_task(the_body=the_task_body, - exchange=exchange, - routing_key=routing_key, - queue_name=queue_name) - # --------------------------------------------------------------------- - - return - - def on_msg_callback(self, channel, method, properties, body): - - - decoded_body = msgpack.unpackb(body, raw=False) - - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], - # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], - step='reindexer', - status='Starting...', - uuid_prefix='meta', - info='no info' - ) - self.rabbit.publish_log(log_message=log_message.__dict__) - # ------------------------------------------------------------ - - cfg = decoded_body['header']['cfg'] - uuid = decoded_body['body'] - count_ref = decoded_body['header']['count'] - - - if 'source_url' in cfg['reindexer'].keys(): - es_source = Elasticsearch([cfg['reindexer']['source_url']], timeout=60) - else: - es_source = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60) - - - - the_query = dict() - the_query['query'] = dict() - the_query['query']['term'] = {'uuid.keyword': '{0}'.format(uuid)} - - es_source.indices.refresh(index=cfg['reindexer']['source_index']) - count_es = es_source.count(cfg['reindexer']['source_index'], body=the_query).get('count') - - - - if count_es != count_ref: - logging.warning('Documents are still being pushed to the source index for dataset with uuid = %s' % uuid) - logging.debug('count_es = %i; count_ref = %i' % (count_es, count_ref)) - time.sleep(5) - channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - return - - - # 1. remove already existing docs from destination index - logging.info("Removing dataset with uuid = %s from the destination index..." % uuid) - - es = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60) - index = cfg['reindexer']['destination_index'] - - try: - es.indices.refresh(index=index) - except NotFoundError: - # the destination index may not be already present - pass - - the_query = dict() - the_query['query'] = dict() - the_query['query']['term'] = {'uuid.keyword': '{0}'.format(uuid)} - - try: - res = es.delete_by_query(index, doc_type='_doc', body=the_query) - logging.debug(res) - res = es.indices.refresh(index=index) - logging.debug(res) - except NotFoundError: - pass - except Exception as e: - logging.error(e) - channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - return - - - # 3. trigger reindexation - - body = { - "source": { - "index": cfg['reindexer']['source_index'], - "query": { - "term": {"uuid.keyword": '{0}'.format(uuid)} - }, - "type": "_doc", - "size": 1000 - }, - "dest": { - "index": cfg['reindexer']['destination_index'], - "type": "_doc" - } - } - - if 'source_url' in cfg['reindexer'].keys(): - body['source']['remote'] = {'host': cfg['reindexer']['source_url']} - - rep = es.reindex(body, wait_for_completion=False) - - logging.debug(rep) - - if 'task' in rep: - channel.basic_ack(delivery_tag=method.delivery_tag) - #print("") - reindex_task_url = "{0}/_tasks/{1}".format(cfg['reindexer']['destination_url'], rep['task']) - logging.info("Created reindex task: {0}".format(reindex_task_url)) - - # 3. create sampling task (full -> meta) - if '.full' in uuid: - self.create_sampling_task(cfg, channel, uuid)#, reindex_task_url) - logging.info("Created sampling task.") - # otherwise, remove the lock - else: - logging.info("Removing lock for dataset with uuid = %s." % uuid.replace('.meta', '')) - unlock( cfg['session']['working_directory'], uuid.replace('.meta', '')) - - - - else: - channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - #print("") - logging.error(json.dumps(rep, indent=4)) - logging.error("Failed") - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], - # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], - step='reindexer', - status='Terminated', - uuid_prefix='meta', - info='no info' - ) - self.rabbit.publish_log(log_message=log_message.__dict__) - # ------------------------------------------------------------ - - return - - def main(self): - - with RabbitSession(self.cfg) as self.rabbit: - # ------------------------------------------------------------ - docs_to_enrich_qn = cfg['rabbitmq_queue'] - self.rabbit.consume_queue_and_launch_specific_method(specific_method=self.on_msg_callback, - specific_queue=docs_to_enrich_qn) - - - return - - -if __name__ == '__main__': - - import yaml - import time - import signal - import argparse - - signal.signal(signal.SIGINT, exit_gracefully) - - parser = argparse.ArgumentParser(description='Incremental reindexer') - parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, required=True) - parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672) - parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True) - parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True) - parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) - - args = parser.parse_args() - - cfg = dict() - cfg['rabbitmq'] = dict() - cfg['rabbitmq_host'] = args.host - cfg['rabbitmq_port'] = args.port - cfg['rabbitmq_exchange'] = args.exchange - cfg['rabbitmq_queue'] = args.queue - cfg['rabbitmq']['user'] = 'admin' - cfg['rabbitmq']['password'] = 'admin' - cfg['rabbitmq']['queue_logs_name'] = 'session_logs' - cfg['rabbitmq']['routing_key_logs'] = 'scripts_log_key' - cfg['rabbitmq']['exchange_logs_name'] = 'download_data_grandlyon_com_logs' - - logging.getLogger().setLevel(args.loglevel) - logging.info('Starting...') - - while True: - try: - Reindexer(cfg).main() - except pika.exceptions.ChannelClosed: - logging.info("Waiting for tasks...") - time.sleep(5) - except pika.exceptions.AMQPConnectionError: - logging.info('Waiting for RabbitMQ to be reachable...') - time.sleep(5) - except Exception as e: - logging.error(e) - time.sleep(5) - exit(1) diff --git a/workers/sample-generator-oo.py b/workers/sample-generator-oo.py deleted file mode 100644 index a10e22a223e0585056fc9ee008aef1314e818209..0000000000000000000000000000000000000000 --- a/workers/sample-generator-oo.py +++ /dev/null @@ -1,195 +0,0 @@ -import pika -import msgpack -import requests -import json -import time -import os, sys -from elasticsearch import Elasticsearch -from elasticsearch.exceptions import AuthorizationException - -fileDir = os.path.dirname(os.path.abspath(__file__)) -parentDir = os.path.dirname(fileDir) -newPath = os.path.join(parentDir) -sys.path.append(newPath) -from lib.exit_gracefully import exit_gracefully -from lib.my_logging import logging -from lib.locker import unlock - -from lib.rabbit_session import RabbitSession -from lib.log_message import LogMessage - - -class Sampler: - - def __init__(self, cfg): - self.cfg = cfg - self.rabbit = None - - - def callback(self, channel, method, properties, body): - - sample_size = 10 - - decoded_body = msgpack.unpackb(body, raw=False) - - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], - # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], - step='sampler', - status='Starting...', - uuid_prefix='meta', - info='no info' - ) - self.rabbit.publish_log(log_message=log_message.__dict__) - # ------------------------------------------------------------ - - cfg = decoded_body['header']['cfg'] - #reindex_task_url = decoded_body['header']['reindex_task_url'] - uuid = decoded_body['body'] - - # get sample records from the ingest index - source_es = Elasticsearch([cfg['reindexer']['source_url']], timeout=60) - - the_query = dict() - the_query['size'] = sample_size - the_query['query'] = dict() - the_query['query']['term'] = {'uuid.keyword': uuid} - - res = source_es.search(cfg['reindexer']['source_index'], '_doc', the_query) - - docs_to_index = [ doc['_source'] for doc in res['hits']['hits'] ] - - if len(docs_to_index) == 0: - logging.error('Zero documents found for dataset with uuid = %s: sleeping for 5 seconds...' % (uuid)) - time.sleep(5) - channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - return - - # delete the already existing samples - destin_es = Elasticsearch([cfg['reindexer']['destination_url']], timeout=60) - destin_es.indices.refresh(index=cfg['reindexer']['destination_index']) - - the_query = dict() - the_query['query'] = dict() - the_query['query']['term'] = {'editorial-metadata.isSample': True} - the_query['query']['term'] = {'uuid.keyword': uuid.replace('.full', '.meta')} - - logging.info("Deleting already existing samples for dataset with slug = %s" % docs_to_index[0]['slug']) - try: - res = destin_es.delete_by_query(cfg['reindexer']['destination_index'], doc_type='_doc', body=the_query) - logging.debug(res) - res = destin_es.indices.refresh(index=cfg['reindexer']['destination_index']) - logging.debug(res) - except AuthorizationException: # TODO correct this "Unresolved reference for AuthorizationException" - time.sleep(5) - channel.basic_nack(delivery_tag=method.delivery_tag, requeue=1) - return - except Exception as e: - logging.error("Exception:") - logging.error(e) - logging.error("Exiting.") - exit(1) - - t1 = time.time() - - # push sample records to the destination index - es_body = '' - header = { "index" : { "_index" : cfg['reindexer']['destination_index'], "_type" : "_doc" } } - for doc in docs_to_index: - - doc['editorial-metadata']['isSample'] = True - doc['uuid'] = uuid.replace('.full', '.meta') - - es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc)) - - logging.info("Pushing {0} samples to Elasticsearch for dataset {1}...".format(len(docs_to_index), docs_to_index[0]['slug'])) - rep = destin_es.bulk(body=es_body) - - t2 = time.time() - - if rep['errors'] == False: - channel.basic_ack(delivery_tag = method.delivery_tag) - logging.info("Done in %s seconds." % (t2-t1)) - destin_es.indices.refresh(index=cfg['reindexer']['destination_index']) - logging.info("Removing lock for dataset with uuid = %s." % uuid.replace('.full', '')) - unlock( cfg['session']['working_directory'], uuid.replace('.full', '') ) - else: - channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - logging.error(json.dumps(rep, indent=4)) - logging.error("Failed") - - # else: - # - # time.sleep(5) - # channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], - # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], - step='sampler', - status='Terminated', - uuid_prefix='meta', - info='no info' - ) - self.rabbit.publish_log(log_message=log_message.__dict__) - # ------------------------------------------------------------ - return - - def main(self): - with RabbitSession(self.cfg) as self.rabbit: - # ------------------------------------------------------------ - docs_to_enrich_qn = cfg['rabbitmq_queue'] - self.rabbit.consume_queue_and_launch_specific_method(specific_method=self.callback, - specific_queue=docs_to_enrich_qn) - - return - - -if __name__ == "__main__": - - import yaml - import time - import signal - import argparse - - signal.signal(signal.SIGINT, exit_gracefully) - - parser = argparse.ArgumentParser(description='Sample generator') - parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, required=True) - parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672) - parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True) - parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True) - parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) - - args = parser.parse_args() - - cfg = dict() - cfg['rabbitmq'] = dict() - cfg['rabbitmq_host'] = args.host - cfg['rabbitmq_port'] = args.port - cfg['rabbitmq_exchange'] = args.exchange - cfg['rabbitmq_queue'] = args.queue - cfg['rabbitmq']['user'] = 'admin' - cfg['rabbitmq']['password'] = 'admin' - cfg['rabbitmq']['queue_logs_name'] = 'session_logs' - cfg['rabbitmq']['routing_key_logs'] = 'scripts_log_key' - cfg['rabbitmq']['exchange_logs_name'] = 'download_data_grandlyon_com_logs' - - logging.getLogger().setLevel(args.loglevel) - logging.info('Starting...') - - while True: - - try: - Sampler(cfg).main() - except pika.exceptions.ChannelClosed: - logging.info("Waiting for tasks...") - time.sleep(5) - except pika.exceptions.AMQPConnectionError: - logging.info('Waiting for RabbitMQ to be reachable...') - time.sleep(5) - except Exception as e: - logging.error(e) - time.sleep(5) - exit(1)