diff --git a/.env.template b/.env.template index bb7adf6e9ac7af1efff63695226ccc41fe1eadad..7f2df560f3915ee9b854c1cc301f4d823197c782 100644 --- a/.env.template +++ b/.env.template @@ -1,3 +1,5 @@ RABBIT_USER=<user_login> RABBIT_PASSWORD=<user_password> +MONGO_USER=<user_login> +MONGO_PASSWORD=<user_password> PWD=. \ No newline at end of file diff --git a/docker-compose-workers.yml b/docker-compose-workers.yml index b48696ab607be5ec97365f555fd3675cb3e2b7e4..e54331fdb450cd6e0e199c2b3f348e2ad31bb3d0 100644 --- a/docker-compose-workers.yml +++ b/docker-compose-workers.yml @@ -14,7 +14,7 @@ services: metadata-processor: build: . image: data-grandlyon-com-indexer - command: python workers/metadata-processor.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue metadata_records_to_process --loglevel DEBUG + command: python workers/metadata-processor.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue metadata_records_to_process --loglevel DEBUG --user ${RABBIT_USER} --password ${RABBIT_PASSWORD} volumes: #- ./config.yaml:/app/config.yaml:ro - working-directory:/app/output @@ -23,7 +23,7 @@ services: doc-enricher: build: . image: data-grandlyon-com-indexer - command: python workers/doc-enricher.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue doc_pages_to_enrich --loglevel DEBUG + command: python workers/doc-enricher.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue doc_pages_to_enrich --loglevel DEBUG --user ${RABBIT_USER} --password ${RABBIT_PASSWORD} # volumes: # - ./config.yaml:/app/config.yaml:ro restart: unless-stopped @@ -31,7 +31,7 @@ services: doc-processor: build: . image: data-grandlyon-com-indexer - command: python workers/doc-processor.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue doc_pages_to_process --loglevel DEBUG + command: python workers/doc-processor.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue doc_pages_to_process --loglevel DEBUG --user ${RABBIT_USER} --password ${RABBIT_PASSWORD} volumes: # - ./config.yaml:/app/config.yaml:ro - working-directory:/app/output @@ -40,7 +40,7 @@ services: doc-indexer: build: . image: data-grandlyon-com-indexer - command: python workers/doc-indexer.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue doc_pages_to_index --loglevel DEBUG + command: python workers/doc-indexer.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue doc_pages_to_index --loglevel DEBUG --user ${RABBIT_USER} --password ${RABBIT_PASSWORD} # volumes: # - ./config.yaml:/app/config.yaml:ro restart: unless-stopped @@ -48,7 +48,7 @@ services: reindexer: build: . image: data-grandlyon-com-indexer - command: python workers/reindexer.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue reindex_tasks --loglevel DEBUG + command: python workers/reindexer.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue reindex_tasks --loglevel DEBUG --user ${RABBIT_USER} --password ${RABBIT_PASSWORD} volumes: - working-directory:/app/output restart: unless-stopped @@ -56,7 +56,7 @@ services: sampler: build: . image: data-grandlyon-com-indexer - command: python workers/sample-generator.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue sampling_tasks --loglevel DEBUG + command: python workers/sample-generator.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue sampling_tasks --loglevel DEBUG --user ${RABBIT_USER} --password ${RABBIT_PASSWORD} volumes: - working-directory:/app/output restart: unless-stopped @@ -79,7 +79,7 @@ services: process-logger: build: . image: data-grandlyon-com-process-logger - command: python workers/process-logger.py --loglevel DEBUG + command: python workers/process-logger.py --loglevel DEBUG --user ${RABBIT_USER} --password ${RABBIT_PASSWORD} --mongouser ${MONGO_USER} --mongopassword ${MONGO_PASSWORD} # volumes: # - ./config.yaml:/app/config.yaml:ro restart: unless-stopped diff --git a/workers/doc-enricher.py b/workers/doc-enricher.py index 0dc47ce308366e33248f9bb913f0529f2e17b7f3..6a1e1db9c7678d2a114a07f3eb7324735c22b10b 100644 --- a/workers/doc-enricher.py +++ b/workers/doc-enricher.py @@ -15,340 +15,276 @@ from lib.my_logging import logging from lib.postgis_helper import Remote from lib.serializers import encode_datetime -from lib.rabbit_session import RabbitSession -from lib.log_message import LogMessage +def get_entries_from_postgis( link, cfg, no_features_per_page=1000 ): -class DocEnricher: + dbname = link['url'].split('/')[-1] + schema, table_name = link['name'].split('.') - def __init__(self, cfg): - self.cfg = cfg - self.rabbit = None + logging.info('Getting data from database %s...' % dbname) + logging.info('Establishing a database connection...') + pg = Remote(hostname=cfg['host'], dbname=dbname, username=cfg['username'], password=cfg['password']) + logging.info('Done.') - def truncate(self, n, decimals=0): - multiplier = 10 ** decimals - return int(n * multiplier) / multiplier + try: + table = pg.get_table(table_name, schema=schema) + except NoSuchTableError: + logging.debug('Table %s in schema % s not found :-(' % (table_name, schema)) + return - def get_entries_from_postgis(self, link, cfg, no_features_per_page=1000): + count = pg.count_entries(table) - dbname = link['url'].split('/')[-1] - schema, table_name = link['name'].split('.') + no_pages = count//no_features_per_page+1 + logging.info('Getting %i entries in %i pages from table %s.%s...'% (count, no_pages, schema, table_name)) - logging.info('Getting data from database %s...' % dbname) - logging.info('Establishing a database connection...') - pg = Remote(hostname=cfg['host'], dbname=dbname, username=cfg['username'], password=cfg['password']) - logging.info('Done.') + feature_page = [] # we accumulate entries in this sort of buffer + cnt = 0 + for entry in pg.get_entries(table): + feature_page.append(entry) + if len(feature_page) == no_features_per_page: + cnt += 1 + logging.debug('Yielding page %i/%i, with %i features' % (cnt, no_pages, len(feature_page))) + yield (cnt/no_pages, count, feature_page) + feature_page = [] - try: - table = pg.get_table(table_name, schema=schema) - except NoSuchTableError: - logging.debug('Table %s in schema % s not found :-(' % (table_name, schema)) - return + logging.debug('Yielding last page with %i features' % len(feature_page)) + yield (1, count, feature_page) # this will be the last feature_page - count = pg.count_entries(table) + return - no_pages = count//no_features_per_page+1 - logging.info('Getting %i entries in %i pages from table %s.%s...'% (count, no_pages, schema, table_name)) - feature_page = [] # we accumulate entries in this sort of buffer - cnt = 0 - for entry in pg.get_entries(table): - feature_page.append(entry) - if len(feature_page) == no_features_per_page: - cnt += 1 - logging.debug('Yielding page %i/%i, with %i features' % (cnt, no_pages, len(feature_page))) - yield (cnt/no_pages, count, feature_page) - feature_page = [] +def get_wfs( link, credentials, offset=0, no_features_per_page=1000 ): - logging.debug('Yielding last page with %i features' % len(feature_page)) - yield (1, count, feature_page) # this will be the last feature_page + root_url = link['url'] + print(offset, no_features_per_page) - return + params = {} + params['version'] = '2.0.0' + params['service'] = 'WFS' + params['outputFormat'] = 'geojson' + params['request'] = 'GetFeature' + params['maxFeatures'] = no_features_per_page + params['typeName'] = link['name'] + # params['sortBy'] = 'gid' # TODO: is it safe not to force any sortBy? + #params['startindex'] = 11 + params['SRSNAME'] = 'epsg:4326' + #startindex = 0 + cnt = offset / no_features_per_page + 1 - def get_wfs( link, credentials, offset=0, no_features_per_page=1000 ): - root_url = link['url'] - print(offset, no_features_per_page) + logging.info('WFS page %i; offset = %i' % (cnt, offset)) + params['startindex'] = offset #0 + cnt*no_features_per_page + #params['to'] = params['from'] + no_records_per_page - 1 - params = {} - params['version'] = '2.0.0' - params['service'] = 'WFS' - params['outputFormat'] = 'geojson' - params['request'] = 'GetFeature' - params['maxFeatures'] = no_features_per_page - params['typeName'] = link['name'] - # params['sortBy'] = 'gid' # TODO: is it safe not to force any sortBy? - #params['startindex'] = 11 - params['SRSNAME'] = 'epsg:4326' + with_credentials = False + for domain in credentials: + if domain in link['url'] and credentials[domain]['username'] != None and credentials[domain]['password']: + logging.info('Found a valid credential.') + with_credentials = True + username = credentials[domain]['username'] + password = credentials[domain]['password'] + break - #startindex = 0 - cnt = offset / no_features_per_page + 1 + if with_credentials: + res = requests.get(root_url, params = params, auth=(username, password)) + else: + res = requests.get(root_url, params = params) + logging.debug(res.url) - logging.info('WFS page %i; offset = %i' % (cnt, offset)) - params['startindex'] = offset #0 + cnt*no_features_per_page - #params['to'] = params['from'] + no_records_per_page - 1 + try: + # print(res.status_code) + # print(res.text) + # print(res.json()) + features = res.json()['features'] + #processed_features = process_features(features) + logging.debug(len(features)) - with_credentials = False - for domain in credentials: - if domain in link['url'] and credentials[domain]['username'] != None and credentials[domain]['password']: - logging.info('Found a valid credential.') - with_credentials = True - username = credentials[domain]['username'] - password = credentials[domain]['password'] - break + return features - if with_credentials: - res = requests.get(root_url, params=params, auth=(username, password)) - else: - res = requests.get(root_url, params=params) + # if len(features) < no_features_per_page: + # break # it means that we have reached the last page + # + # cnt += 1 - logging.debug(res.url) - try: - # print(res.status_code) - # print(res.text) - # print(res.json()) - features = res.json()['features'] - #processed_features = process_features(features) - logging.debug(len(features)) + except: #json.decoder.JSONDecodeError: # it means that we the WFS request failed, for instance because of insufficient right access + #logging.error("Failed WFS request: %s" % res.url) + logging.error("Failed WFS request: %s" % res.url) - return features + #yield None - # if len(features) < no_features_per_page: - # break # it means that we have reached the last page - # - # cnt += 1 + #raise Exception("Failed WFS request: %s" % res.url) + return None - except: #json.decoder.JSONDecodeError: # it means that we the WFS request failed, for instance because of insufficient right access - #logging.error("Failed WFS request: %s" % res.url) - logging.error("Failed WFS request: %s" % res.url) + #print() - #yield None - #raise Exception("Failed WFS request: %s" % res.url) - return None - #print() - def old_enrich_docs( self, channel, method, properties, body, **kwargs ): - decoded_body = msgpack.unpackb(body, raw=False) +def old_enrich_docs( channel, method, properties, body, **kwargs ): - wfs_info = decoded_body['header']['wfs_info'] - offset = decoded_body['header']['offset'] - session_id = decoded_body['header']['session_id'] - dest_index = decoded_body['header']['dest_index'] + decoded_body = msgpack.unpackb(body, raw=False) - logging.info('Enriching dataset named: %s' % decoded_body['body']['metadata-fr']['title']) + wfs_info = decoded_body['header']['wfs_info'] + offset = decoded_body['header']['offset'] + session_id = decoded_body['header']['session_id'] + dest_index = decoded_body['header']['dest_index'] - feature_page = self.get_wfs(wfs_info, kwargs['credentials'], offset, kwargs['features_per_page']) - # we implement pagination by letting this program creating tasks for itself / its siblings - if feature_page != None and len(feature_page) == kwargs['features_per_page']: # at least another page is needed - msg = {'header': {'wfs_info': wfs_info, 'offset': offset+kwargs['features_per_page'], 'session_id': session_id, 'dest_index': dest_index}, 'body': decoded_body['body']} - the_body = msgpack.packb(msg, use_bin_type=True) + logging.info('Enriching dataset named: %s' % decoded_body['body']['metadata-fr']['title']) - channel.basic_publish( exchange=kwargs['exchange'], - routing_key=kwargs['docs_to_enrich_rk'], - body=the_body, - properties=pika.BasicProperties(delivery_mode = 2) - ) + feature_page = get_wfs(wfs_info, kwargs['credentials'], offset, kwargs['features_per_page']) - if feature_page != None: + # we implement pagination by letting this program creating tasks for itself / its siblings + if feature_page != None and len(feature_page) == kwargs['features_per_page']: # at least another page is needed + msg = {'header': {'wfs_info': wfs_info, 'offset': offset+kwargs['features_per_page'], 'session_id': session_id, 'dest_index': dest_index}, 'body': decoded_body['body']} + the_body = msgpack.packb(msg, use_bin_type=True) - # try: - # #for feature_page in feature_pages: - # #print(feature_page[0]['properties']['nom_reduit']) - logging.info('Sending feature page of len = %i to RabbitMQ and MongoDB...' % len(feature_page)) - doc_page = [{**decoded_body['body'], 'data-fr': feature} for feature in feature_page] + channel.basic_publish( exchange=kwargs['exchange'], + routing_key=kwargs['docs_to_enrich_rk'], + body=the_body, + properties=pika.BasicProperties(delivery_mode = 2) + ) - msg = {'header': {'metadata': decoded_body['body'], 'session_id': session_id, 'dest_index': dest_index}, 'body': doc_page} - the_body = msgpack.packb(msg, use_bin_type=True) + if feature_page != None: - channel.basic_publish( exchange=kwargs['exchange'], - routing_key=kwargs['doc_pages_to_store_in_mongo_rk'], - body=the_body, - properties=pika.BasicProperties(delivery_mode = 2) - ) + # try: + # #for feature_page in feature_pages: + # #print(feature_page[0]['properties']['nom_reduit']) + logging.info('Sending feature page of len = %i to RabbitMQ and MongoDB...' % len(feature_page)) + doc_page = [{**decoded_body['body'], 'data-fr': feature} for feature in feature_page] + msg = {'header': {'metadata': decoded_body['body'], 'session_id': session_id, 'dest_index': dest_index}, 'body': doc_page} + the_body = msgpack.packb(msg, use_bin_type=True) - channel.basic_publish( exchange=kwargs['exchange'], - routing_key=kwargs['doc_pages_to_process_rk'], - body=the_body, - properties=pika.BasicProperties(delivery_mode = 2) - ) - logging.info('...done!') + channel.basic_publish( exchange=kwargs['exchange'], + routing_key=kwargs['doc_pages_to_store_in_mongo_rk'], + body=the_body, + properties=pika.BasicProperties(delivery_mode = 2) + ) - # except TypeError: # it means that getWFS returned None - # pass - # # - # #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + channel.basic_publish( exchange=kwargs['exchange'], + routing_key=kwargs['doc_pages_to_process_rk'], + body=the_body, + properties=pika.BasicProperties(delivery_mode = 2) + ) + logging.info('...done!') - channel.basic_ack(delivery_tag = method.delivery_tag) - #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + # except TypeError: # it means that getWFS returned None + # pass + # # + # #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - return #out_docs - def enrich_docs( self, channel, method, properties, body ): + channel.basic_ack(delivery_tag = method.delivery_tag) + #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + return #out_docs - decoded_body = msgpack.unpackb(body, raw=False) - wfs_info = decoded_body['header']['wfs_info'] - cfg = decoded_body['header']['cfg'] - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], - # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], - step='doc-enricher', - status=decoded_body['body']['metadata-fr']['title'], - uuid_prefix='meta', - info='starting' - ) - self.rabbit.publish_log(log_message=log_message.__dict__) - # ------------------------------------------------------------------------ +def enrich_docs( channel, method, properties, body ): - # # initialize RabbitMQ queues - # exchange = cfg['rabbitmq']['exchange'] - # doc_pages_to_process_qn = cfg['rabbitmq']['queue_name_5'] - # doc_pages_to_process_rk = cfg['rabbitmq']['routing_key_5'] - # - # del cfg['rabbitmq']['queue_name_5'] - # del cfg['rabbitmq']['routing_key_5'] - # - # channel.exchange_declare(exchange=exchange, exchange_type='direct') - # - # channel.queue_declare(queue=doc_pages_to_process_qn, durable=True) - # channel.queue_bind(exchange=exchange, queue=doc_pages_to_process_qn, routing_key=doc_pages_to_process_rk) - - logging.info('Enriching dataset named: %s' % decoded_body['body']['metadata-fr']['title']) - - postgis_cfg = cfg['postgis'] - del cfg['postgis'] # <- as this information is no longer needed - - for progress_ratio, count, feature_page in self.get_entries_from_postgis(wfs_info, postgis_cfg): - - last_update = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") - - if len(feature_page) == 0: - count = 1 # at least we have a metadata-only document! count cannot be left = 0, otherwise the reindexer would never start - logging.debug('Empty page!') - doc_page = [{**decoded_body['body'], 'last_update': last_update}] - else: - doc_page = [{**decoded_body['body'], 'last_update': last_update, 'data-fr': feature} for feature in feature_page] - - - logging.info('[%6.2f%%] Sending %i docs to RabbitMQ for dataset %s...' % (progress_ratio*100, len(doc_page), doc_page[0]['slug'])) - progress_rounded = round(progress_ratio*100) - if progress_rounded % 10: - info_message = '[' +str(progress_rounded)+'%] Sending ' + str(len(doc_page)) + 'docs to RabbitMQ for dataset ' + doc_page[0]['slug'] + '...' - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], - # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], - step='doc-enricher', - status=decoded_body['body']['metadata-fr']['title'], - uuid_prefix='meta', - info=info_message - ) - self.rabbit.publish_log(log_message=log_message.__dict__) - # ------------------------------------------------------------------------ - msg = dict() - msg['header'] = dict() - msg['header']['cfg'] = cfg - msg['header']['progress_ratio'] = progress_ratio - msg['header']['count'] = count - msg['body'] = doc_page - - the_task_body = msgpack.packb(msg, use_bin_type=True, default=encode_datetime) - - # channel.basic_publish( exchange=exchange, - # routing_key=doc_pages_to_process_rk, - # body=the_body, - # properties=pika.BasicProperties(delivery_mode = 2) - # ) - # ------------------------------send-task-------------------------------------- - - # initialize RabbitMQ queues - exchange = cfg['rabbitmq']['exchange'] - doc_pages_to_process_qn = cfg['rabbitmq']['queue_name_5'] - doc_pages_to_process_rk = cfg['rabbitmq']['routing_key_5'] - - - - self.rabbit.publish_task(the_body=the_task_body, - exchange=exchange, - routing_key=doc_pages_to_process_rk, - queue_name=doc_pages_to_process_qn) - - # ------------------------------------------------------------------------- - - #logging.info('...done!') - - # except TypeError: # it means that getWFS returned None - # pass - # # - # #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - - # channel.basic_ack(delivery_tag = method.delivery_tag) - #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], - # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], - step='doc-enricher', - status='Terminated.', - uuid_prefix='meta', - info='no info' - ) - self.rabbit.publish_log(log_message=log_message.__dict__) - # ------------------------------------------------------------------------ - del cfg['rabbitmq']['queue_name_5'] - del cfg['rabbitmq']['routing_key_5'] - return #out_docs - - def main(self): - - with RabbitSession(self.cfg) as self.rabbit: - # ------------------------------------------------------------ - docs_to_enrich_qn = cfg['rabbitmq_queue'] - self.rabbit.consume_queue_and_launch_specific_method(specific_method=self.enrich_docs, - specific_queue=docs_to_enrich_qn) - # <-- the rabbit connexion is automatically closed here - - - #from lib.close_connection import on_timeout - - # connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'], port=cfg['rabbitmq_port'])) - # #timeout = 5 - # #connection.add_timeout(timeout, on_timeout(connection)) - # - # channel = connection.channel() - # # exchange = cfg['rabbitmq_exchange'] - # # the queue this program will consume messages from: - # docs_to_enrich_qn = cfg['rabbitmq_queue'] - # - # channel.basic_qos(prefetch_count=1) - # channel.basic_consume(on_message_callback=lambda ch, method, properties, body: - # enrich_docs(ch, method, properties, body), - # #doc_pages_to_store_in_mongo_rk=doc_pages_to_store_in_mongo_rk, - # #docs_to_enrich_rk=docs_to_enrich_rk, - # #doc_pages_to_process_rk=doc_pages_to_process_rk, - # #features_per_page=cfg['wfs']['features_per_page'], - # #postgis_cfg=cfg['postgis']), - # queue=docs_to_enrich_qn)#, no_ack=True) - # channel.start_consuming() - # connection.close() + + decoded_body = msgpack.unpackb(body, raw=False) + + wfs_info = decoded_body['header']['wfs_info'] + cfg = decoded_body['header']['cfg'] + + # initialize RabbitMQ queues + exchange = cfg['rabbitmq']['exchange'] + doc_pages_to_process_qn = cfg['rabbitmq']['queue_name_5'] + doc_pages_to_process_rk = cfg['rabbitmq']['routing_key_5'] + + del cfg['rabbitmq']['queue_name_5'] + del cfg['rabbitmq']['routing_key_5'] + + channel.exchange_declare(exchange=exchange, exchange_type='direct') + + channel.queue_declare(queue=doc_pages_to_process_qn, durable=True) + channel.queue_bind(exchange=exchange, queue=doc_pages_to_process_qn, routing_key=doc_pages_to_process_rk) + + + logging.info('Enriching dataset named: %s' % decoded_body['body']['metadata-fr']['title']) + + postgis_cfg = cfg['postgis'] + del cfg['postgis'] # <- as this information is no longer needed + + for progress_ratio, count, feature_page in get_entries_from_postgis(wfs_info, postgis_cfg): + + last_update = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") + + if len(feature_page) == 0: + count = 1 # at least we have a metadata-only document! count cannot be left = 0, otherwise the reindexer would never start + logging.debug('Empty page!') + doc_page = [{**decoded_body['body'], 'last_update': last_update}] + else: + doc_page = [{**decoded_body['body'], 'last_update': last_update, 'data-fr': feature} for feature in feature_page] + + logging.info('[%6.2f%%] Sending %i docs to RabbitMQ for dataset %s...' % (progress_ratio*100, len(doc_page), doc_page[0]['slug'])) + + msg = dict() + msg['header'] = dict() + msg['header']['cfg'] = cfg + msg['header']['progress_ratio'] = progress_ratio + msg['header']['count'] = count + msg['body'] = doc_page + + the_body = msgpack.packb(msg, use_bin_type=True, default=encode_datetime) + + channel.basic_publish( exchange=exchange, + routing_key=doc_pages_to_process_rk, + body=the_body, + properties=pika.BasicProperties(delivery_mode = 2) + ) + + #logging.info('...done!') + + # except TypeError: # it means that getWFS returned None + # pass + # # + # #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + + channel.basic_ack(delivery_tag = method.delivery_tag) + #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + + return #out_docs + + +def main(cfg): + + #from lib.close_connection import on_timeout + + connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'], port=cfg['rabbitmq_port'])) + #timeout = 5 + #connection.add_timeout(timeout, on_timeout(connection)) + + channel = connection.channel() + # exchange = cfg['rabbitmq_exchange'] + # the queue this program will consume messages from: + docs_to_enrich_qn = cfg['rabbitmq_queue'] + + channel.basic_qos(prefetch_count=1) + channel.basic_consume(on_message_callback=lambda ch, method, properties, body: + enrich_docs(ch, method, properties, body), + #doc_pages_to_store_in_mongo_rk=doc_pages_to_store_in_mongo_rk, + #docs_to_enrich_rk=docs_to_enrich_rk, + #doc_pages_to_process_rk=doc_pages_to_process_rk, + #features_per_page=cfg['wfs']['features_per_page'], + #postgis_cfg=cfg['postgis']), + queue=docs_to_enrich_qn)#, no_ack=True) + channel.start_consuming() + connection.close() if __name__ == '__main__': @@ -362,33 +298,31 @@ if __name__ == '__main__': signal.signal(signal.SIGINT, exit_gracefully) parser = argparse.ArgumentParser(description='Document enricher') - parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, default='rabbitmq') + parser.add_argument('--host', dest='host', help='the RabbitMQ host', type=str, required=True) parser.add_argument('--port', dest='port', help='the RabbitMQ port', type=int, default=5672) - parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, - default='download_data_grandlyon_com_index') - parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, default='doc_pages_to_enrich') - parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) + parser.add_argument('--exchange', dest='exchange', help='the RabbitMQ exchange', type=str, required=True) + parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True) + parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, + choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) + parser.add_argument('--user', dest='user', help='the RabbitMQ user login', type=str, required=True) + parser.add_argument('--password', dest='password', help='the RabbitMQ user password', type=str, required=True) args = parser.parse_args() cfg = dict() - cfg['rabbitmq'] = dict() cfg['rabbitmq_host'] = args.host cfg['rabbitmq_port'] = args.port cfg['rabbitmq_exchange'] = args.exchange cfg['rabbitmq_queue'] = args.queue - cfg['rabbitmq']['user'] = 'admin' - cfg['rabbitmq']['password'] = 'admin' - cfg['rabbitmq']['queue_logs_name'] = 'session_logs' - cfg['rabbitmq']['routing_key_logs'] = 'scripts_log_key' - cfg['rabbitmq']['exchange_logs_name'] = 'download_data_grandlyon_com_logs' + cfg['rabbitmq']['user'] = args.user + cfg['rabbitmq']['password'] = args.password logging.getLogger().setLevel(args.loglevel) logging.info('Starting...') while True: try: - DocEnricher(cfg).main() + main(cfg) except pika.exceptions.ChannelClosed: logging.info('Waiting for tasks...') time.sleep(5) @@ -397,6 +331,5 @@ if __name__ == '__main__': time.sleep(5) except Exception as e: logging.error(e) - print('[xxxx] Exception', e) time.sleep(5) exit(1) diff --git a/workers/doc-indexer.py b/workers/doc-indexer.py index a6305e507679eca2fbcab2d001ec28db585c15c9..7a1e0668aa58c53ab26e64870876686635e33181 100644 --- a/workers/doc-indexer.py +++ b/workers/doc-indexer.py @@ -1,6 +1,8 @@ import pika import msgpack +import time import json +import hashlib import os, sys from elasticsearch import Elasticsearch from elasticsearch.exceptions import AuthorizationException @@ -11,226 +13,239 @@ newPath = os.path.join(parentDir) sys.path.append(newPath) from lib.exit_gracefully import exit_gracefully from lib.my_logging import logging - -from lib.rabbit_session import RabbitSession from lib.log_message import LogMessage +def tag_doc( the_doc ): + + # tag_list = ['isOpenAccess', 'isRealTime', 'isQueryable', 'isSearchable', 'isPunctual', 'isLinear', 'isAreal'] + tag_dict = {} + + # initialisation + #for tag in tag_list: + # tag_dict[tag] = False + + # isOpen? + if 'license' in the_doc['metadata-fr'].keys() and not any( [x in the_doc['metadata-fr']['license'] for x in ['Licence Associée', 'Licence Engagée'] ] ): + tag_dict['isOpenAccess'] = True + else: + tag_dict['isOpenAccess'] = False + + # isRealTime? + if 'updateFrequency' in the_doc['metadata-fr'].keys() and 'continual' in the_doc['metadata-fr']['updateFrequency']: + tag_dict['isRealTime'] = True + else: + tag_dict['isRealTime'] = False + + # isQueryable? + tag_dict['isQueryable'] = False # default + if 'link' in the_doc['metadata-fr'].keys(): + for link in the_doc['metadata-fr']['link']: + #print(link) + if 'service' in link.keys() and any( [x in link['service'] for x in ['WFS', 'WMS', 'KML', 'WS']] ): + tag_dict['isQueryable'] = True + break + + # N.B.: in order to determine the following tags, we need the data-fr field; + # in case the data-fr field is absent, the tags 'isSearchable', + # 'isPunctual', 'isLinear', 'isAreal' will be absent instead of being 'falsely' set to false! + + # isSearchable? + tag_dict['isSearchable'] = False # default + if 'data-fr' in the_doc.keys(): + tag_dict['isSearchable'] = True -class DocIndexer: - def __init__(self, cfg): - self.cfg = cfg - self.rabbit = None + if 'geometry' in the_doc['data-fr'].keys(): - @staticmethod - def tag_doc(the_doc): + # init + tag_dict['isPunctual'] = False + tag_dict['isLinear'] = False + tag_dict['isAreal'] = False - # tag_list = ['isOpenAccess', 'isRealTime', 'isQueryable', 'isSearchable', 'isPunctual', 'isLinear', 'isAreal'] - tag_dict = {} + # isPunctual? + if any( [x in the_doc['data-fr']['geometry']['type'] for x in ['Point', 'MultiPoint']] ): + tag_dict['isPunctual'] = True - # initialisation - # for tag in tag_list: - # tag_dict[tag] = False + # isLinear? + if any( [x in the_doc['data-fr']['geometry']['type'] for x in ['LineString', 'MultiLineString']] ): + tag_dict['isLinear'] = True - # isOpen? - if 'license' in the_doc['metadata-fr'].keys() and not any([x in the_doc['metadata-fr']['license'] - for x in ['Licence Associée', 'Licence Engagée']]): - tag_dict['isOpenAccess'] = True - else: - tag_dict['isOpenAccess'] = False + # isAreal? + if any( [x in the_doc['data-fr']['geometry']['type'] for x in ['Polygon', 'MultiPolygon']] ): + tag_dict['isAreal'] = True - # isRealTime? - if 'updateFrequency' in the_doc['metadata-fr'].keys() \ - and 'continual' in the_doc['metadata-fr']['updateFrequency']: - tag_dict['isRealTime'] = True - else: - tag_dict['isRealTime'] = False + # isSample? docs that are tagged by this script are never just a sample + tag_dict['isSample'] = False - # isQueryable? - tag_dict['isQueryable'] = False # default - if 'link' in the_doc['metadata-fr'].keys(): - for link in the_doc['metadata-fr']['link']: - # print(link) - if 'service' in link.keys() and any([x in link['service'] for x in ['WFS', 'WMS', 'KML', 'WS']]): - tag_dict['isQueryable'] = True - break + tagged_doc = {'editorial-metadata': tag_dict, **the_doc} - # N.B.: in order to determine the following tags, we need the data-fr field; - # in case the data-fr field is absent, the tags 'isSearchable', - # 'isPunctual', 'isLinear', 'isAreal' will be absent instead of being 'falsely' set to false! + return tagged_doc - # isSearchable? - tag_dict['isSearchable'] = False # default - if 'data-fr' in the_doc.keys(): - tag_dict['isSearchable'] = True - if 'geometry' in the_doc['data-fr'].keys(): +def index_docs(channel, method, properties, body): - # init - tag_dict['isPunctual'] = False - tag_dict['isLinear'] = False - tag_dict['isAreal'] = False + t1 = time.time() + decoded_body = msgpack.unpackb(body, raw=False) - # isPunctual? - if any([x in the_doc['data-fr']['geometry']['type'] for x in ['Point', 'MultiPoint']]): - tag_dict['isPunctual'] = True + cfg = decoded_body['header']['cfg'] - # isLinear? - if any([x in the_doc['data-fr']['geometry']['type'] for x in ['LineString', 'MultiLineString']]): - tag_dict['isLinear'] = True + exchange = cfg['rabbitmq']['exchange'] + queue_name = cfg['rabbitmq']['queue_name_4'] + routing_key = cfg['rabbitmq']['routing_key_4'] + queue_logs_name = cfg['rabbitmq']['queue_logs_name'] + routing_key_logs = cfg['rabbitmq']['routing_key_logs'] + exchange_logs_name = cfg['rabbitmq']['exchange_logs_name'] - # isAreal? - if any([x in the_doc['data-fr']['geometry']['type'] for x in ['Polygon', 'MultiPolygon']]): - tag_dict['isAreal'] = True + del cfg['rabbitmq']['queue_name_4'] + del cfg['rabbitmq']['routing_key_4'] - # isSample? docs that are tagged by this script are never just a sample - tag_dict['isSample'] = False + channel.exchange_declare(exchange=exchange, exchange_type='direct') + channel.queue_declare(queue=queue_name, durable=True) + channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key) + channel.queue_declare(queue=queue_logs_name, durable=True) + channel.queue_bind(exchange=exchange_logs_name, queue=queue_logs_name, routing_key=routing_key_logs) - tagged_doc = {'editorial-metadata': tag_dict, **the_doc} + progress_ratio = decoded_body['header']['progress_ratio'] + count = decoded_body['header']['count'] - return tagged_doc + es = Elasticsearch([cfg['indexer']['url']], timeout=60) + # es_logger = logging.getLogger('elasticsearch') + # es_logger.setLevel(logging.INFO) - def index_docs(self, channel, method, properties, body): - t1 = time.time() - decoded_body = msgpack.unpackb(body, raw=False) + if type(decoded_body['body']) is list: + #docs_to_index = decoded_body['body'] + docs_to_index = [tag_doc(doc) for doc in decoded_body['body']] + + else: + #docs_to_index = [decoded_body['body']] + docs_to_index = [tag_doc(decoded_body['body'])] + + logging.info("[%6.2f%%] Pushing %i documents to Elasticsearch for dataset %s..." % (progress_ratio*100, len(docs_to_index), docs_to_index[0]['slug'])) + + progress_rounded = round(progress_ratio * 100) + if progress_rounded % 10: + status_message = '[' + str(progress_rounded) + '%] Sending ' + str( + len(docs_to_index)) + 'docs to RabbitMQ for dataset ' + str(docs_to_index[0]['slug']) + '...' # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], + log_message = LogMessage(session_id=cfg['session']['id'], # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], + uuid=cfg['session']['current_uuid'], step='doc-indexer', - status='Starting...', + status=status_message, uuid_prefix='meta', info='no info' ) - self.rabbit.publish_log(log_message=log_message.__dict__) + + json_body = json.dumps(log_message) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=exchange, + routing_key=routing_key_logs, + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) # ------------------------------------------------------------ - cfg = decoded_body['header']['cfg'] - progress_ratio = decoded_body['header']['progress_ratio'] - count = decoded_body['header']['count'] - es = Elasticsearch([cfg['indexer']['url']], timeout=60) - # es_logger = logging.getLogger('elasticsearch') - # es_logger.setLevel(logging.INFO) + es_body = '' + header = {'index': { "_index" : cfg['indexer']['index'], "_type" : "_doc" }} + #{ "index" : { "_index" : the_dest_index, "_type" : "_doc" } } + for doc in docs_to_index: + # header['index']['_id'] = hashlib.md5( json.dumps(tmp, sort_keys=True).encode("utf-8") ).hexdigest() + es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc)) - if type(decoded_body['body']) is list: - # docs_to_index = decoded_body['body'] - docs_to_index = [self.tag_doc(doc) for doc in decoded_body['body']] - else: - # docs_to_index = [decoded_body['body']] - docs_to_index = [self.tag_doc(decoded_body['body'])] + rep = es.bulk(body=es_body) - logging.info("[%6.2f%%] Pushing %i documents to Elasticsearch for dataset %s..." % (progress_ratio*100, - len(docs_to_index), - docs_to_index[0]['slug'])) + #print(rep) + t2 = time.time() - es_body = '' - header = {'index': {"_index": cfg['indexer']['index'], "_type": "_doc"}} - # { "index" : { "_index" : the_dest_index, "_type" : "_doc" } } - for doc in docs_to_index: - # header['index']['_id'] = hashlib.md5( json.dumps(tmp, sort_keys=True).encode("utf-8") ).hexdigest() - es_body += '{0}\n{1}\n'.format(json.dumps(header), json.dumps(doc)) + if rep['errors'] == False: + channel.basic_ack(delivery_tag = method.delivery_tag) + #print("") + logging.info("Done in %s seconds." % (t2-t1)) - rep = es.bulk(body=es_body) - # print(rep) - t2 = time.time() + if progress_ratio == 1: - if rep['errors'] is False: - channel.basic_ack(delivery_tag=method.delivery_tag) - # print("") - logging.info("Done in %s seconds." % (t2-t1)) + uuid = docs_to_index[0]['uuid'] + logging.info("Reached last page for dataset with uuid = %s. Creating reindexation task..." % (uuid)) - if progress_ratio == 1: - uuid = docs_to_index[0]['uuid'] - logging.info("Reached last page for dataset with uuid = %s. Creating reindexation task..." % uuid) - exchange = cfg['rabbitmq']['exchange'] - queue_name = cfg['rabbitmq']['queue_name_4'] - routing_key = cfg['rabbitmq']['routing_key_4'] + msg = dict() - del cfg['rabbitmq']['queue_name_4'] - del cfg['rabbitmq']['routing_key_4'] + msg['header'] = dict() + msg['header']['cfg'] = cfg + msg['header']['count'] = count + msg['body'] = uuid - # channel.exchange_declare(exchange=exchange, exchange_type='direct') - # channel.queue_declare(queue=queue_name, durable=True) - # channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key) + the_body = msgpack.packb(msg, use_bin_type=False) - msg = dict() + channel.basic_publish( exchange=exchange, + routing_key=routing_key, + body=the_body, + properties=pika.BasicProperties(delivery_mode = 2)) - msg['header'] = dict() - msg['header']['cfg'] = cfg - msg['header']['count'] = count - msg['body'] = uuid - the_task_body = msgpack.packb(msg, use_bin_type=False) - # ------------------------send task----------------------------------- - self.rabbit.publish_task(the_body=the_task_body, - exchange=exchange, - routing_key=routing_key, - queue_name=queue_name) - # --------------------------------------------------------------------- + else: + channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + #print("") + logging.error(json.dumps(rep, indent=4)) + logging.error("Failed") - # channel.basic_publish( exchange=exchange, - # routing_key=routing_key, - # body=the_body, - # properties=pika.BasicProperties(delivery_mode = 2)) - else: - channel.basic_nack(delivery_tag=method.delivery_tag, requeue=1) - # print("") - logging.error(json.dumps(rep, indent=4)) - logging.error("Failed") + #time.sleep(5) + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='doc-indexer', + status='terminated', + uuid_prefix='meta', + info='no info' + ) - # time.sleep(5) - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], - # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], - step='doc-indexer', - status='Terminated...', - uuid_prefix='meta', - info='no info' - ) - self.rabbit.publish_log(log_message=log_message.__dict__) - # ------------------------------------------------------------ + json_body = json.dumps(log_message) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=exchange, + routing_key=routing_key_logs, + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ + + return - return - - def main(self): - - with RabbitSession(self.cfg) as self.rabbit: - # ------------------------------------------------------------ - docs_to_enrich_qn = cfg['rabbitmq_queue'] - self.rabbit.consume_queue_and_launch_specific_method(specific_method=self.index_docs, - specific_queue=docs_to_enrich_qn) - - # from lib.close_connection import on_timeout - - # connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'], - # port=cfg['rabbitmq_port'])) - # # timeout = 5 - # # connection.add_timeout(timeout, on_timeout(connection)) - # - # channel = connection.channel() - # - # # the queue this program will be consuming messages from - # docs_to_index_qn = cfg['rabbitmq_queue'] - # - # channel.basic_qos(prefetch_count=1) - # channel.basic_consume(on_message_callback=lambda ch, method, - # properties, body: index_docs(ch, method, properties, body), - # queue=docs_to_index_qn)#, no_ack=True) - # channel.start_consuming() - # - # connection.close() + +def main(cfg): + + #from lib.close_connection import on_timeout + + connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'], port=cfg['rabbitmq_port'])) + # timeout = 5 + # connection.add_timeout(timeout, on_timeout(connection)) + + channel = connection.channel() + + # the queue this program will be consuming messages from + docs_to_index_qn = cfg['rabbitmq_queue'] + + channel.basic_qos(prefetch_count=1) + channel.basic_consume(on_message_callback=lambda ch, method, properties, body: index_docs(ch, method, properties, body), + queue=docs_to_index_qn)#, no_ack=True) + channel.start_consuming() + + connection.close() if __name__ == '__main__': + import yaml import time import signal import argparse @@ -242,20 +257,19 @@ if __name__ == '__main__': parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True) parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) + parser.add_argument('--user', dest='user', help='the RabbitMQ user login', type=str, required=True) + parser.add_argument('--password', dest='password', help='the RabbitMQ user password', type=str, required=True) args = parser.parse_args() cfg = dict() - cfg['rabbitmq'] = dict() cfg['rabbitmq_host'] = args.host cfg['rabbitmq_port'] = args.port cfg['rabbitmq_exchange'] = args.exchange cfg['rabbitmq_queue'] = args.queue - cfg['rabbitmq']['user'] = 'admin' - cfg['rabbitmq']['password'] = 'admin' - cfg['rabbitmq']['queue_logs_name'] = 'session_logs' - cfg['rabbitmq']['routing_key_logs'] = 'scripts_log_key' - cfg['rabbitmq']['exchange_logs_name'] = 'download_data_grandlyon_com_logs' + cfg['rabbitmq']['user'] = args.user + cfg['rabbitmq']['password'] = args.password + logging.getLogger().setLevel(args.loglevel) logging.info('Starting...') @@ -265,7 +279,7 @@ if __name__ == '__main__': while True: try: - DocIndexer(cfg).main() + main(cfg) except pika.exceptions.ChannelClosed: logging.info("Waiting for tasks...") time.sleep(5) diff --git a/workers/process-logger.py b/workers/process-logger.py index 9929a5ce0c8f6e39c911a9e973ef1866b2f08fcc..1307b7e4e13d0619bf47e40d315e9f06ec6a94f9 100644 --- a/workers/process-logger.py +++ b/workers/process-logger.py @@ -36,6 +36,10 @@ if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description='Document indexer') parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) + parser.add_argument('--user', dest='user', help='the RabbitMQ user login', type=str, required=True) + parser.add_argument('--password', dest='password', help='the RabbitMQ user password', type=str, required=True) + parser.add_argument('--mongouser', dest='user', help='the mongo user login', type=str, required=True) + parser.add_argument('--mongopassword', dest='password', help='the mongo user password', type=str, required=True) args = parser.parse_args() # logging.info('Starting...') @@ -46,16 +50,16 @@ if __name__ == '__main__': cfg['rabbitmq']['host'] = 'rabbitmq' cfg['rabbitmq']['port'] = 5672 - cfg['rabbitmq']['user'] = 'admin' - cfg['rabbitmq']['password'] = 'admin' + cfg['rabbitmq']['user'] = args.user + cfg['rabbitmq']['password'] = args.password cfg['rabbitmq']['exchange_logs_name'] = 'download_data_grandlyon_com_logs' cfg['rabbitmq']['queue_logs_name'] = 'session_logs' cfg['mongo']['host'] = 'mongo' cfg['mongo']['database'] = 'indexerdb' cfg['mongo']['port'] = 27017 - cfg['mongo']['user'] = 'root' - cfg['mongo']['password'] = 'example' + cfg['mongo']['user'] = args.mongouser + cfg['mongo']['password'] = args.mongopassword cfg['mongo']['collection'] = 'indexer_logs' # logging.getLogger().setLevel(args.loglevel)