diff --git a/workers/doc-enricher-oo.py b/workers/doc-enricher-oo.py new file mode 100644 index 0000000000000000000000000000000000000000..0dc47ce308366e33248f9bb913f0529f2e17b7f3 --- /dev/null +++ b/workers/doc-enricher-oo.py @@ -0,0 +1,402 @@ +import pika +import msgpack +import requests +import json +import datetime +import os, sys +from sqlalchemy.exc import NoSuchTableError + +fileDir = os.path.dirname(os.path.abspath(__file__)) +parentDir = os.path.dirname(fileDir) +newPath = os.path.join(parentDir) +sys.path.append(newPath) +from lib.exit_gracefully import exit_gracefully +from lib.my_logging import logging +from lib.postgis_helper import Remote +from lib.serializers import encode_datetime + +from lib.rabbit_session import RabbitSession +from lib.log_message import LogMessage + + +class DocEnricher: + + def __init__(self, cfg): + self.cfg = cfg + self.rabbit = None + + def truncate(self, n, decimals=0): + multiplier = 10 ** decimals + return int(n * multiplier) / multiplier + + def get_entries_from_postgis(self, link, cfg, no_features_per_page=1000): + + dbname = link['url'].split('/')[-1] + schema, table_name = link['name'].split('.') + + logging.info('Getting data from database %s...' % dbname) + logging.info('Establishing a database connection...') + pg = Remote(hostname=cfg['host'], dbname=dbname, username=cfg['username'], password=cfg['password']) + logging.info('Done.') + + try: + table = pg.get_table(table_name, schema=schema) + except NoSuchTableError: + logging.debug('Table %s in schema % s not found :-(' % (table_name, schema)) + return + + count = pg.count_entries(table) + + no_pages = count//no_features_per_page+1 + logging.info('Getting %i entries in %i pages from table %s.%s...'% (count, no_pages, schema, table_name)) + + feature_page = [] # we accumulate entries in this sort of buffer + cnt = 0 + for entry in pg.get_entries(table): + feature_page.append(entry) + if len(feature_page) == no_features_per_page: + cnt += 1 + logging.debug('Yielding page %i/%i, with %i features' % (cnt, no_pages, len(feature_page))) + yield (cnt/no_pages, count, feature_page) + feature_page = [] + + logging.debug('Yielding last page with %i features' % len(feature_page)) + yield (1, count, feature_page) # this will be the last feature_page + + return + + + def get_wfs( link, credentials, offset=0, no_features_per_page=1000 ): + + root_url = link['url'] + print(offset, no_features_per_page) + + params = {} + params['version'] = '2.0.0' + params['service'] = 'WFS' + params['outputFormat'] = 'geojson' + params['request'] = 'GetFeature' + params['maxFeatures'] = no_features_per_page + params['typeName'] = link['name'] + # params['sortBy'] = 'gid' # TODO: is it safe not to force any sortBy? + #params['startindex'] = 11 + params['SRSNAME'] = 'epsg:4326' + + #startindex = 0 + cnt = offset / no_features_per_page + 1 + + + logging.info('WFS page %i; offset = %i' % (cnt, offset)) + params['startindex'] = offset #0 + cnt*no_features_per_page + #params['to'] = params['from'] + no_records_per_page - 1 + + with_credentials = False + for domain in credentials: + if domain in link['url'] and credentials[domain]['username'] != None and credentials[domain]['password']: + logging.info('Found a valid credential.') + with_credentials = True + username = credentials[domain]['username'] + password = credentials[domain]['password'] + break + + if with_credentials: + res = requests.get(root_url, params=params, auth=(username, password)) + else: + res = requests.get(root_url, params=params) + + logging.debug(res.url) + + try: + # print(res.status_code) + # print(res.text) + # print(res.json()) + features = res.json()['features'] + #processed_features = process_features(features) + logging.debug(len(features)) + + return features + + # if len(features) < no_features_per_page: + # break # it means that we have reached the last page + # + # cnt += 1 + + + except: #json.decoder.JSONDecodeError: # it means that we the WFS request failed, for instance because of insufficient right access + #logging.error("Failed WFS request: %s" % res.url) + logging.error("Failed WFS request: %s" % res.url) + + #yield None + + #raise Exception("Failed WFS request: %s" % res.url) + + return None + + #print() + + def old_enrich_docs( self, channel, method, properties, body, **kwargs ): + + decoded_body = msgpack.unpackb(body, raw=False) + + wfs_info = decoded_body['header']['wfs_info'] + offset = decoded_body['header']['offset'] + session_id = decoded_body['header']['session_id'] + dest_index = decoded_body['header']['dest_index'] + + logging.info('Enriching dataset named: %s' % decoded_body['body']['metadata-fr']['title']) + + feature_page = self.get_wfs(wfs_info, kwargs['credentials'], offset, kwargs['features_per_page']) + + # we implement pagination by letting this program creating tasks for itself / its siblings + if feature_page != None and len(feature_page) == kwargs['features_per_page']: # at least another page is needed + msg = {'header': {'wfs_info': wfs_info, 'offset': offset+kwargs['features_per_page'], 'session_id': session_id, 'dest_index': dest_index}, 'body': decoded_body['body']} + the_body = msgpack.packb(msg, use_bin_type=True) + + channel.basic_publish( exchange=kwargs['exchange'], + routing_key=kwargs['docs_to_enrich_rk'], + body=the_body, + properties=pika.BasicProperties(delivery_mode = 2) + ) + + + if feature_page != None: + + # try: + # #for feature_page in feature_pages: + # #print(feature_page[0]['properties']['nom_reduit']) + logging.info('Sending feature page of len = %i to RabbitMQ and MongoDB...' % len(feature_page)) + doc_page = [{**decoded_body['body'], 'data-fr': feature} for feature in feature_page] + + msg = {'header': {'metadata': decoded_body['body'], 'session_id': session_id, 'dest_index': dest_index}, 'body': doc_page} + the_body = msgpack.packb(msg, use_bin_type=True) + + + channel.basic_publish( exchange=kwargs['exchange'], + routing_key=kwargs['doc_pages_to_store_in_mongo_rk'], + body=the_body, + properties=pika.BasicProperties(delivery_mode = 2) + ) + + + channel.basic_publish( exchange=kwargs['exchange'], + routing_key=kwargs['doc_pages_to_process_rk'], + body=the_body, + properties=pika.BasicProperties(delivery_mode = 2) + ) + + logging.info('...done!') + + # except TypeError: # it means that getWFS returned None + # pass + # # + # #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + + + + channel.basic_ack(delivery_tag = method.delivery_tag) + #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + + return #out_docs + + + def enrich_docs( self, channel, method, properties, body ): + + + decoded_body = msgpack.unpackb(body, raw=False) + + wfs_info = decoded_body['header']['wfs_info'] + cfg = decoded_body['header']['cfg'] + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], + # session_id=cfg['session']['id'], + uuid=decoded_body['header']['cfg']['session']['current_uuid'], + step='doc-enricher', + status=decoded_body['body']['metadata-fr']['title'], + uuid_prefix='meta', + info='starting' + ) + self.rabbit.publish_log(log_message=log_message.__dict__) + # ------------------------------------------------------------------------ + + # # initialize RabbitMQ queues + # exchange = cfg['rabbitmq']['exchange'] + # doc_pages_to_process_qn = cfg['rabbitmq']['queue_name_5'] + # doc_pages_to_process_rk = cfg['rabbitmq']['routing_key_5'] + # + # del cfg['rabbitmq']['queue_name_5'] + # del cfg['rabbitmq']['routing_key_5'] + # + # channel.exchange_declare(exchange=exchange, exchange_type='direct') + # + # channel.queue_declare(queue=doc_pages_to_process_qn, durable=True) + # channel.queue_bind(exchange=exchange, queue=doc_pages_to_process_qn, routing_key=doc_pages_to_process_rk) + + logging.info('Enriching dataset named: %s' % decoded_body['body']['metadata-fr']['title']) + + postgis_cfg = cfg['postgis'] + del cfg['postgis'] # <- as this information is no longer needed + + for progress_ratio, count, feature_page in self.get_entries_from_postgis(wfs_info, postgis_cfg): + + last_update = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") + + if len(feature_page) == 0: + count = 1 # at least we have a metadata-only document! count cannot be left = 0, otherwise the reindexer would never start + logging.debug('Empty page!') + doc_page = [{**decoded_body['body'], 'last_update': last_update}] + else: + doc_page = [{**decoded_body['body'], 'last_update': last_update, 'data-fr': feature} for feature in feature_page] + + + logging.info('[%6.2f%%] Sending %i docs to RabbitMQ for dataset %s...' % (progress_ratio*100, len(doc_page), doc_page[0]['slug'])) + progress_rounded = round(progress_ratio*100) + if progress_rounded % 10: + info_message = '[' +str(progress_rounded)+'%] Sending ' + str(len(doc_page)) + 'docs to RabbitMQ for dataset ' + doc_page[0]['slug'] + '...' + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], + # session_id=cfg['session']['id'], + uuid=decoded_body['header']['cfg']['session']['current_uuid'], + step='doc-enricher', + status=decoded_body['body']['metadata-fr']['title'], + uuid_prefix='meta', + info=info_message + ) + self.rabbit.publish_log(log_message=log_message.__dict__) + # ------------------------------------------------------------------------ + msg = dict() + msg['header'] = dict() + msg['header']['cfg'] = cfg + msg['header']['progress_ratio'] = progress_ratio + msg['header']['count'] = count + msg['body'] = doc_page + + the_task_body = msgpack.packb(msg, use_bin_type=True, default=encode_datetime) + + # channel.basic_publish( exchange=exchange, + # routing_key=doc_pages_to_process_rk, + # body=the_body, + # properties=pika.BasicProperties(delivery_mode = 2) + # ) + # ------------------------------send-task-------------------------------------- + + # initialize RabbitMQ queues + exchange = cfg['rabbitmq']['exchange'] + doc_pages_to_process_qn = cfg['rabbitmq']['queue_name_5'] + doc_pages_to_process_rk = cfg['rabbitmq']['routing_key_5'] + + + + self.rabbit.publish_task(the_body=the_task_body, + exchange=exchange, + routing_key=doc_pages_to_process_rk, + queue_name=doc_pages_to_process_qn) + + # ------------------------------------------------------------------------- + + #logging.info('...done!') + + # except TypeError: # it means that getWFS returned None + # pass + # # + # #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + + # channel.basic_ack(delivery_tag = method.delivery_tag) + #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], + # session_id=cfg['session']['id'], + uuid=decoded_body['header']['cfg']['session']['current_uuid'], + step='doc-enricher', + status='Terminated.', + uuid_prefix='meta', + info='no info' + ) + self.rabbit.publish_log(log_message=log_message.__dict__) + # ------------------------------------------------------------------------ + del cfg['rabbitmq']['queue_name_5'] + del cfg['rabbitmq']['routing_key_5'] + return #out_docs + + def main(self): + + with RabbitSession(self.cfg) as self.rabbit: + # ------------------------------------------------------------ + docs_to_enrich_qn = cfg['rabbitmq_queue'] + self.rabbit.consume_queue_and_launch_specific_method(specific_method=self.enrich_docs, + specific_queue=docs_to_enrich_qn) + # <-- the rabbit connexion is automatically closed here + + + #from lib.close_connection import on_timeout + + # connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'], port=cfg['rabbitmq_port'])) + # #timeout = 5 + # #connection.add_timeout(timeout, on_timeout(connection)) + # + # channel = connection.channel() + # # exchange = cfg['rabbitmq_exchange'] + # # the queue this program will consume messages from: + # docs_to_enrich_qn = cfg['rabbitmq_queue'] + # + # channel.basic_qos(prefetch_count=1) + # channel.basic_consume(on_message_callback=lambda ch, method, properties, body: + # enrich_docs(ch, method, properties, body), + # #doc_pages_to_store_in_mongo_rk=doc_pages_to_store_in_mongo_rk, + # #docs_to_enrich_rk=docs_to_enrich_rk, + # #doc_pages_to_process_rk=doc_pages_to_process_rk, + # #features_per_page=cfg['wfs']['features_per_page'], + # #postgis_cfg=cfg['postgis']), + # queue=docs_to_enrich_qn)#, no_ack=True) + # channel.start_consuming() + # connection.close() + + +if __name__ == '__main__': + + + import yaml + import time + import signal + import argparse + + signal.signal(signal.SIGINT, exit_gracefully) + + parser = argparse.ArgumentParser(description='Document enricher') + parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, default='rabbitmq') + parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672) + parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, + default='download_data_grandlyon_com_index') + parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, default='doc_pages_to_enrich') + parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) + + args = parser.parse_args() + + cfg = dict() + cfg['rabbitmq'] = dict() + cfg['rabbitmq_host'] = args.host + cfg['rabbitmq_port'] = args.port + cfg['rabbitmq_exchange'] = args.exchange + cfg['rabbitmq_queue'] = args.queue + cfg['rabbitmq']['user'] = 'admin' + cfg['rabbitmq']['password'] = 'admin' + cfg['rabbitmq']['queue_logs_name'] = 'session_logs' + cfg['rabbitmq']['routing_key_logs'] = 'scripts_log_key' + cfg['rabbitmq']['exchange_logs_name'] = 'download_data_grandlyon_com_logs' + + logging.getLogger().setLevel(args.loglevel) + logging.info('Starting...') + + while True: + try: + DocEnricher(cfg).main() + except pika.exceptions.ChannelClosed: + logging.info('Waiting for tasks...') + time.sleep(5) + except pika.exceptions.AMQPConnectionError: + logging.info('Waiting for RabbitMQ to be reachable...') + time.sleep(5) + except Exception as e: + logging.error(e) + print('[xxxx] Exception', e) + time.sleep(5) + exit(1) diff --git a/workers/doc-enricher.py b/workers/doc-enricher.py index 6a1e1db9c7678d2a114a07f3eb7324735c22b10b..18104afb4bb4c736b18db64175d4d6bb77ccd8cc 100644 --- a/workers/doc-enricher.py +++ b/workers/doc-enricher.py @@ -14,6 +14,8 @@ from lib.exit_gracefully import exit_gracefully from lib.my_logging import logging from lib.postgis_helper import Remote from lib.serializers import encode_datetime +from lib.log_message import LogMessage + def get_entries_from_postgis( link, cfg, no_features_per_page=1000 ): @@ -205,6 +207,9 @@ def enrich_docs( channel, method, properties, body ): exchange = cfg['rabbitmq']['exchange'] doc_pages_to_process_qn = cfg['rabbitmq']['queue_name_5'] doc_pages_to_process_rk = cfg['rabbitmq']['routing_key_5'] + queue_logs_name = cfg['rabbitmq']['queue_logs_name'] + routing_key_logs = cfg['rabbitmq']['routing_key_logs'] + exchange_logs_name = cfg['rabbitmq']['exchange_logs_name'] del cfg['rabbitmq']['queue_name_5'] del cfg['rabbitmq']['routing_key_5'] @@ -214,6 +219,8 @@ def enrich_docs( channel, method, properties, body ): channel.queue_declare(queue=doc_pages_to_process_qn, durable=True) channel.queue_bind(exchange=exchange, queue=doc_pages_to_process_qn, routing_key=doc_pages_to_process_rk) + channel.queue_declare(queue=queue_logs_name, durable=True) + channel.queue_bind(exchange=exchange_logs_name, queue=queue_logs_name, routing_key=routing_key_logs) logging.info('Enriching dataset named: %s' % decoded_body['body']['metadata-fr']['title']) @@ -232,6 +239,30 @@ def enrich_docs( channel, method, properties, body ): doc_page = [{**decoded_body['body'], 'last_update': last_update, 'data-fr': feature} for feature in feature_page] logging.info('[%6.2f%%] Sending %i docs to RabbitMQ for dataset %s...' % (progress_ratio*100, len(doc_page), doc_page[0]['slug'])) + progress_rounded = round(progress_ratio * 100) + if progress_rounded % 10: + status_message = '[' + str(progress_rounded) + '%] Sending ' + str( + len(doc_page)) + 'docs to RabbitMQ for dataset ' + str(doc_page[0]['slug']) + '...' + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='doc-enricher', + status=status_message, + uuid_prefix='meta', + info='no info' + ) + + json_body = json.dumps(log_message) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=exchange, + routing_key=routing_key_logs, + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ msg = dict() msg['header'] = dict() @@ -249,7 +280,26 @@ def enrich_docs( channel, method, properties, body ): ) #logging.info('...done!') - + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='doc-enricher', + status='done', + uuid_prefix='meta', + info='no info' + ) + + json_body = json.dumps(log_message) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=exchange, + routing_key=routing_key_logs, + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ # except TypeError: # it means that getWFS returned None # pass # #