import pika import msgpack import requests import json import datetime #from decimal import Decimal from utils.exit_gracefully import exit_gracefully from utils.my_logging import logging from utils.postgis_helper import Remote from utils.serializers import encode_datetime def get_entries_from_postgis( link, cfg, no_features_per_page=1000 ): #print('here', link) dbname = link['url'].split('/')[-1] schema, table = link['name'].split('.') print(dbname, schema, table) #print(cfg) #exit(1) logging.info('Getting data from database %s...' % dbname) logging.info('Establishing a database connection...') pg = Remote(hostname=cfg['host'], dbname=dbname, username=cfg['username'], password=cfg['password']) logging.info('Done.') table = pg.get_table(table, schema=schema) #print(pg.get_tables('bruit')) count = pg.count_entries(table) no_pages = count//no_features_per_page+1 logging.info('Getting %i entries in %i pages from table %s.%s...'% (count, no_pages, schema, table)) feature_page = [] # we accumulate entries in this sort of buffer cnt = 0 for entry in pg.get_entries(table): feature_page.append(entry) if len(feature_page) == no_features_per_page: cnt += 1 print('YIELDING PAGE %i/%i' % (cnt, no_pages)) yield feature_page feature_page = [] print('YIELDING LAST PAGE', len(feature_page)) yield feature_page # this will be the last feature_page return def get_wfs( link, offset=0, no_features_per_page=1000 ): root_url = link['url'] params = {} params['version'] = '2.0.0' params['service'] = 'WFS' params['outputFormat'] = 'geojson' params['request'] = 'GetFeature' params['maxFeatures'] = no_features_per_page params['typeName'] = link['name'] # params['sortBy'] = 'gid' # TODO: is it safe not to force any sortBy? #params['startindex'] = 11 params['SRSNAME'] = 'epsg:4326' #startindex = 0 cnt = offset / no_features_per_page + 1 logging.info('WFS page %i; offset = %i' % (cnt, offset)) params['startindex'] = offset #0 + cnt*no_features_per_page #params['to'] = params['from'] + no_records_per_page - 1 res = requests.get(root_url, params = params) logging.debug(res.url) try: # print(res.status_code) # print(res.text) # print(res.json()) features = res.json()['features'] #processed_features = process_features(features) logging.debug(len(features)) return features # if len(features) < no_features_per_page: # break # it means that we have reached the last page # # cnt += 1 except: #json.decoder.JSONDecodeError: # it means that we the WFS request failed, for instance because of insufficient right access #logging.error("Failed WFS request: %s" % res.url) logging.error("Failed WFS request: %s" % res.url) #yield None #raise Exception("Failed WFS request: %s" % res.url) return None #print() def old_enrich_docs( channel, method, properties, body, **kwargs ): decoded_body = msgpack.unpackb(body, raw=False) wfs_info = decoded_body['header']['wfs_info'] offset = decoded_body['header']['offset'] session_id = decoded_body['header']['session_id'] dest_index = decoded_body['header']['dest_index'] logging.info('Enriching dataset named: %s' % decoded_body['body']['metadata-fr']['title']) feature_page = get_wfs(wfs_info, offset, kwargs['features_per_page']) #feature_page = get_entries_from_postgis(wfs_info, kwargs['postgis_cfg']) # we implement pagination by letting this program creating tasks for itself / its siblings if feature_page != None and len(feature_page) == kwargs['features_per_page']: # at least another page is needed msg = {'header': {'wfs_info': wfs_info, 'offset': offset+kwargs['features_per_page'], 'session_id': session_id, 'dest_index': dest_index}, 'body': decoded_body['body']} the_body = msgpack.packb(msg, use_bin_type=True) channel.basic_publish( exchange=kwargs['exchange'], routing_key=kwargs['docs_to_enrich_rk'], body=the_body, properties=pika.BasicProperties(delivery_mode = 2) ) if feature_page != None: # try: # #for feature_page in feature_pages: # #print(feature_page[0]['properties']['nom_reduit']) logging.info('Sending feature page of len = %i to RabbitMQ and MongoDB...' % len(feature_page)) doc_page = [{**decoded_body['body'], 'data-fr': feature} for feature in feature_page] msg = {'header': {'metadata': decoded_body['body'], 'session_id': session_id, 'dest_index': dest_index}, 'body': doc_page} the_body = msgpack.packb(msg, use_bin_type=True) channel.basic_publish( exchange=kwargs['exchange'], routing_key=kwargs['doc_pages_to_store_in_mongo_rk'], body=the_body, properties=pika.BasicProperties(delivery_mode = 2) ) channel.basic_publish( exchange=kwargs['exchange'], routing_key=kwargs['doc_pages_to_process_rk'], body=the_body, properties=pika.BasicProperties(delivery_mode = 2) ) logging.info('...done!') # except TypeError: # it means that getWFS returned None # pass # # # #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) channel.basic_ack(delivery_tag = method.delivery_tag) #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) return #out_docs def enrich_docs( channel, method, properties, body, **kwargs ): decoded_body = msgpack.unpackb(body, raw=False) wfs_info = decoded_body['header']['wfs_info'] offset = decoded_body['header']['offset'] session_id = decoded_body['header']['session_id'] dest_index = decoded_body['header']['dest_index'] logging.info('Enriching dataset named: %s' % decoded_body['body']['metadata-fr']['title']) #feature_page = get_entries_from_postgis(wfs_info, kwargs['postgis_cfg']) #if feature_page != None: #print('here') for feature_page in get_entries_from_postgis(wfs_info, kwargs['postgis_cfg']): #print('here') logging.info('Sending feature page of len = %i to RabbitMQ...' % len(feature_page)) # for el in feature_page: # for key, v in el.items(): # print(key, v) #print("LEN", len(feature_page)) #continue doc_page = [{**decoded_body['body'], 'data-fr': feature} for feature in feature_page] msg = {'header': {'metadata': decoded_body['body'], 'session_id': session_id, 'dest_index': dest_index}, 'body': doc_page} #print('here') the_body = msgpack.packb(msg, use_bin_type=True, default=encode_datetime) #print('there') # channel.basic_publish( exchange=kwargs['exchange'], # routing_key=kwargs['doc_pages_to_store_in_mongo_rk'], # body=the_body, # properties=pika.BasicProperties(delivery_mode = 2) # ) channel.basic_publish( exchange=kwargs['exchange'], routing_key=kwargs['doc_pages_to_process_rk'], body=the_body, properties=pika.BasicProperties(delivery_mode = 2) ) logging.info('...done!') # except TypeError: # it means that getWFS returned None # pass # # # #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) channel.basic_ack(delivery_tag = method.delivery_tag) #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) return #out_docs def main(cfg): from utils.close_connection import on_timeout connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host'])) timeout = 5 connection.add_timeout(timeout, on_timeout(connection)) channel = connection.channel() exchange = cfg['rabbitmq']['exchange'] # the queue this program will consume messages from: docs_to_enrich_qn = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_3_suffix'] # this program will generate tasks for itself and/or its siblings: docs_to_enrich_rk = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_3_suffix'] doc_pages_to_store_in_mongo_qn = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_4_suffix'] doc_pages_to_store_in_mongo_rk = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_4_suffix'] doc_pages_to_process_qn = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_5_suffix'] doc_pages_to_process_rk = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_5_suffix'] channel.exchange_declare(exchange=cfg['rabbitmq']['exchange'], exchange_type='direct') channel.queue_declare(queue=doc_pages_to_store_in_mongo_qn, durable=True) channel.queue_bind(exchange=exchange, queue=doc_pages_to_store_in_mongo_qn, routing_key=doc_pages_to_store_in_mongo_rk) channel.queue_declare(queue=doc_pages_to_process_qn, durable=True) channel.queue_bind(exchange=exchange, queue=doc_pages_to_process_qn, routing_key=doc_pages_to_process_rk) channel.basic_qos(prefetch_count=1) channel.basic_consume(lambda ch, method, properties, body: enrich_docs(ch, method, properties, body, exchange=exchange, doc_pages_to_store_in_mongo_rk=doc_pages_to_store_in_mongo_rk, docs_to_enrich_rk=docs_to_enrich_rk, doc_pages_to_process_rk=doc_pages_to_process_rk, features_per_page=cfg['wfs']['features_per_page'], postgis_cfg=cfg['postgis']), queue=docs_to_enrich_qn)#, no_ack=True) channel.start_consuming() connection.close() if __name__ == '__main__': import yaml import time import signal signal.signal(signal.SIGINT, exit_gracefully) with open("config.yaml", 'r') as yamlfile: cfg = yaml.load(yamlfile) while True: try: main(cfg) except pika.exceptions.ChannelClosed: logging.info('Waiting for tasks...') time.sleep(5) except Exception as e: logging.info(e) exit(1)