Skip to content
Snippets Groups Projects
3-doc-enricher.py 10.7 KiB
Newer Older
  • Learn to ignore specific revisions
  • Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    import pika
    import msgpack
    import requests
    import json
    
    import datetime
    #from decimal import Decimal
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    from utils.exit_gracefully import exit_gracefully
    from utils.my_logging import logging
    
    from utils.postgis_helper import Remote
    from utils.serializers import encode_datetime
    
    
    def get_entries_from_postgis( link, cfg, no_features_per_page=1000 ):
        #print('here', link)
    
        dbname = link['url'].split('/')[-1]
        schema, table = link['name'].split('.')
        print(dbname, schema, table)
        #print(cfg)
        #exit(1)
    
        logging.info('Getting data from database %s...' % dbname)
        logging.info('Establishing a database connection...')
        pg = Remote(hostname=cfg['host'], dbname=dbname, username=cfg['username'], password=cfg['password'])
        logging.info('Done.')
    
        table = pg.get_table(table, schema=schema)
    
    
        #print(pg.get_tables('bruit'))
    
        count = pg.count_entries(table)
    
        no_pages = count//no_features_per_page+1
        logging.info('Getting %i entries in %i pages from table %s.%s...'% (count, no_pages, schema, table))
    
        feature_page = [] # we accumulate entries in this sort of buffer
        cnt = 0
        for entry in pg.get_entries(table):
            feature_page.append(entry)
            if len(feature_page) == no_features_per_page:
                cnt += 1
                print('YIELDING PAGE %i/%i' % (cnt, no_pages))
                yield feature_page
                feature_page = []
    
        print('YIELDING LAST PAGE', len(feature_page))
        yield feature_page # this will be the last feature_page
    
        return
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
    
    def get_wfs( link, offset=0, no_features_per_page=1000 ):
    
        root_url = link['url']
    
        params = {}
        params['version'] = '2.0.0'
        params['service'] = 'WFS'
        params['outputFormat'] = 'geojson'
        params['request'] = 'GetFeature'
        params['maxFeatures'] = no_features_per_page
        params['typeName'] = link['name']
        # params['sortBy'] = 'gid' # TODO: is it safe not to force any sortBy?
        #params['startindex'] = 11
        params['SRSNAME'] = 'epsg:4326'
    
        #startindex = 0
        cnt = offset / no_features_per_page + 1
    
    
        logging.info('WFS page %i; offset = %i' % (cnt, offset))
        params['startindex'] = offset #0 + cnt*no_features_per_page
        #params['to'] = params['from'] + no_records_per_page - 1
    
        res = requests.get(root_url, params = params)
    
        logging.debug(res.url)
    
        try:
            # print(res.status_code)
            # print(res.text)
            # print(res.json())
            features = res.json()['features']
            #processed_features = process_features(features)
            logging.debug(len(features))
    
            return features
    
            # if len(features) < no_features_per_page:
            #     break # it means that we have reached the last page
            #
            # cnt += 1
    
    
        except: #json.decoder.JSONDecodeError: # it means that we the WFS request failed, for instance because of insufficient right access
            #logging.error("Failed WFS request: %s" % res.url)
            logging.error("Failed WFS request: %s" % res.url)
    
            #yield None
    
            #raise Exception("Failed WFS request: %s" % res.url)
    
            return None
    
        #print()
    
    
    
    
    
    
    
    def old_enrich_docs( channel, method, properties, body, **kwargs ):
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        decoded_body = msgpack.unpackb(body, raw=False)
    
        wfs_info   = decoded_body['header']['wfs_info']
        offset     = decoded_body['header']['offset']
        session_id = decoded_body['header']['session_id']
        dest_index = decoded_body['header']['dest_index']
    
    
        logging.info('Enriching dataset named: %s' % decoded_body['body']['metadata-fr']['title'])
    
    
        feature_page = get_wfs(wfs_info, offset, kwargs['features_per_page'])
    
        #feature_page = get_entries_from_postgis(wfs_info, kwargs['postgis_cfg'])
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        # we implement pagination by letting this program creating tasks for itself / its siblings
        if feature_page != None and len(feature_page) == kwargs['features_per_page']: # at least another page is needed
            msg = {'header': {'wfs_info': wfs_info, 'offset': offset+kwargs['features_per_page'], 'session_id': session_id, 'dest_index': dest_index}, 'body': decoded_body['body']}
            the_body = msgpack.packb(msg, use_bin_type=True)
    
            channel.basic_publish( exchange=kwargs['exchange'],
                                   routing_key=kwargs['docs_to_enrich_rk'],
                                   body=the_body,
                                   properties=pika.BasicProperties(delivery_mode = 2)
                                 )
    
    
        if feature_page != None:
    
            # try:
            #     #for feature_page in feature_pages:
            #         #print(feature_page[0]['properties']['nom_reduit'])
            logging.info('Sending feature page of len = %i to RabbitMQ and MongoDB...' % len(feature_page))
            doc_page = [{**decoded_body['body'], 'data-fr': feature} for feature in feature_page]
    
            msg = {'header': {'metadata': decoded_body['body'], 'session_id': session_id, 'dest_index': dest_index}, 'body': doc_page}
            the_body = msgpack.packb(msg, use_bin_type=True)
    
    
            channel.basic_publish( exchange=kwargs['exchange'],
                                   routing_key=kwargs['doc_pages_to_store_in_mongo_rk'],
                                   body=the_body,
                                   properties=pika.BasicProperties(delivery_mode = 2)
                                 )
    
    
            channel.basic_publish( exchange=kwargs['exchange'],
                                   routing_key=kwargs['doc_pages_to_process_rk'],
                                   body=the_body,
                                   properties=pika.BasicProperties(delivery_mode = 2)
                                 )
    
            logging.info('...done!')
    
            # except TypeError: # it means that getWFS returned None
            #     pass
            #     #
            #     #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
    
    
    
        channel.basic_ack(delivery_tag = method.delivery_tag)
        #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
    
        return #out_docs
    
    
    
    def enrich_docs( channel, method, properties, body, **kwargs ):
    
    
    
    
        decoded_body = msgpack.unpackb(body, raw=False)
    
        wfs_info   = decoded_body['header']['wfs_info']
        offset     = decoded_body['header']['offset']
        session_id = decoded_body['header']['session_id']
        dest_index = decoded_body['header']['dest_index']
    
    
        logging.info('Enriching dataset named: %s' % decoded_body['body']['metadata-fr']['title'])
    
        #feature_page = get_entries_from_postgis(wfs_info, kwargs['postgis_cfg'])
    
        #if feature_page != None:
        #print('here')
        for feature_page in get_entries_from_postgis(wfs_info, kwargs['postgis_cfg']):
            #print('here')
            logging.info('Sending feature page of len = %i to RabbitMQ...' % len(feature_page))
            # for el in feature_page:
            #     for key, v in el.items():
            #         print(key, v)
            #print("LEN", len(feature_page))
            #continue
    
            doc_page = [{**decoded_body['body'], 'data-fr': feature} for feature in feature_page]
    
            msg = {'header': {'metadata': decoded_body['body'], 'session_id': session_id, 'dest_index': dest_index}, 'body': doc_page}
            #print('here')
            the_body = msgpack.packb(msg, use_bin_type=True, default=encode_datetime)
            #print('there')
    
            # channel.basic_publish( exchange=kwargs['exchange'],
            #                        routing_key=kwargs['doc_pages_to_store_in_mongo_rk'],
            #                        body=the_body,
            #                        properties=pika.BasicProperties(delivery_mode = 2)
            #                      )
    
    
            channel.basic_publish( exchange=kwargs['exchange'],
                                   routing_key=kwargs['doc_pages_to_process_rk'],
                                   body=the_body,
                                   properties=pika.BasicProperties(delivery_mode = 2)
                                 )
    
            logging.info('...done!')
    
            # except TypeError: # it means that getWFS returned None
            #     pass
            #     #
            #     #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
    
    
    
        channel.basic_ack(delivery_tag = method.delivery_tag)
        #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
    
        return #out_docs
    
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    def main(cfg):
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        from utils.close_connection import on_timeout
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host']))
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        timeout = 5
        connection.add_timeout(timeout, on_timeout(connection))
    
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
        channel = connection.channel()
        exchange    = cfg['rabbitmq']['exchange']
        # the queue this program will consume messages from:
        docs_to_enrich_qn = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_3_suffix']
        # this program will generate tasks for itself and/or its siblings:
        docs_to_enrich_rk = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_3_suffix']
    
    
    
        doc_pages_to_store_in_mongo_qn = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_4_suffix']
        doc_pages_to_store_in_mongo_rk = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_4_suffix']
    
        doc_pages_to_process_qn = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_5_suffix']
        doc_pages_to_process_rk = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_5_suffix']
    
    
        channel.exchange_declare(exchange=cfg['rabbitmq']['exchange'], exchange_type='direct')
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
    
        channel.queue_declare(queue=doc_pages_to_store_in_mongo_qn, durable=True)
        channel.queue_bind(exchange=exchange, queue=doc_pages_to_store_in_mongo_qn, routing_key=doc_pages_to_store_in_mongo_rk)
    
        channel.queue_declare(queue=doc_pages_to_process_qn, durable=True)
        channel.queue_bind(exchange=exchange, queue=doc_pages_to_process_qn, routing_key=doc_pages_to_process_rk)
    
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(lambda ch, method, properties, body:
                                enrich_docs(ch, method, properties, body,
                                            exchange=exchange,
                                            doc_pages_to_store_in_mongo_rk=doc_pages_to_store_in_mongo_rk,
                                            docs_to_enrich_rk=docs_to_enrich_rk,
                                            doc_pages_to_process_rk=doc_pages_to_process_rk,
    
                                            features_per_page=cfg['wfs']['features_per_page'],
                                            postgis_cfg=cfg['postgis']),
    
    Alessandro Cerioni's avatar
    Alessandro Cerioni committed
                                queue=docs_to_enrich_qn)#, no_ack=True)
        channel.start_consuming()
        connection.close()
    
    
    if __name__ == '__main__':
    
    
        import yaml
        import time
        import signal
        signal.signal(signal.SIGINT, exit_gracefully)
    
        with open("config.yaml", 'r') as yamlfile:
            cfg = yaml.load(yamlfile)
    
        while True:
            try:
                main(cfg)
            except pika.exceptions.ChannelClosed:
                logging.info('Waiting for tasks...')
                time.sleep(5)
            except Exception as e:
                logging.info(e)
                exit(1)