-
Alessandro Cerioni authoredAlessandro Cerioni authored
3-doc-enricher.py 6.82 KiB
import pika
import msgpack
import requests
import json
from utils.exit_gracefully import exit_gracefully
from utils.my_logging import logging
def get_wfs( link, offset=0, no_features_per_page=1000 ):
root_url = link['url']
params = {}
params['version'] = '2.0.0'
params['service'] = 'WFS'
params['outputFormat'] = 'geojson'
params['request'] = 'GetFeature'
params['maxFeatures'] = no_features_per_page
params['typeName'] = link['name']
# params['sortBy'] = 'gid' # TODO: is it safe not to force any sortBy?
#params['startindex'] = 11
params['SRSNAME'] = 'epsg:4326'
#startindex = 0
cnt = offset / no_features_per_page + 1
logging.info('WFS page %i; offset = %i' % (cnt, offset))
params['startindex'] = offset #0 + cnt*no_features_per_page
#params['to'] = params['from'] + no_records_per_page - 1
res = requests.get(root_url, params = params)
logging.debug(res.url)
try:
# print(res.status_code)
# print(res.text)
# print(res.json())
features = res.json()['features']
#processed_features = process_features(features)
logging.debug(len(features))
return features
# if len(features) < no_features_per_page:
# break # it means that we have reached the last page
#
# cnt += 1
except: #json.decoder.JSONDecodeError: # it means that we the WFS request failed, for instance because of insufficient right access
#logging.error("Failed WFS request: %s" % res.url)
logging.error("Failed WFS request: %s" % res.url)
#yield None
#raise Exception("Failed WFS request: %s" % res.url)
return None
#print()
def enrich_docs( channel, method, properties, body, **kwargs ):
decoded_body = msgpack.unpackb(body, raw=False)
wfs_info = decoded_body['header']['wfs_info']
offset = decoded_body['header']['offset']
session_id = decoded_body['header']['session_id']
dest_index = decoded_body['header']['dest_index']
logging.info('Enriching dataset named: %s' % decoded_body['body']['metadata-fr']['title'])
feature_page = get_wfs(wfs_info, offset, kwargs['features_per_page'])
# we implement pagination by letting this program creating tasks for itself / its siblings
if feature_page != None and len(feature_page) == kwargs['features_per_page']: # at least another page is needed
msg = {'header': {'wfs_info': wfs_info, 'offset': offset+kwargs['features_per_page'], 'session_id': session_id, 'dest_index': dest_index}, 'body': decoded_body['body']}
the_body = msgpack.packb(msg, use_bin_type=True)
channel.basic_publish( exchange=kwargs['exchange'],
routing_key=kwargs['docs_to_enrich_rk'],
body=the_body,
properties=pika.BasicProperties(delivery_mode = 2)
)
if feature_page != None:
# try:
# #for feature_page in feature_pages:
# #print(feature_page[0]['properties']['nom_reduit'])
logging.info('Sending feature page of len = %i to RabbitMQ and MongoDB...' % len(feature_page))
doc_page = [{**decoded_body['body'], 'data-fr': feature} for feature in feature_page]
msg = {'header': {'metadata': decoded_body['body'], 'session_id': session_id, 'dest_index': dest_index}, 'body': doc_page}
the_body = msgpack.packb(msg, use_bin_type=True)
channel.basic_publish( exchange=kwargs['exchange'],
routing_key=kwargs['doc_pages_to_store_in_mongo_rk'],
body=the_body,
properties=pika.BasicProperties(delivery_mode = 2)
)
channel.basic_publish( exchange=kwargs['exchange'],
routing_key=kwargs['doc_pages_to_process_rk'],
body=the_body,
properties=pika.BasicProperties(delivery_mode = 2)
)
logging.info('...done!')
# except TypeError: # it means that getWFS returned None
# pass
# #
# #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
channel.basic_ack(delivery_tag = method.delivery_tag)
#channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
return #out_docs
def main(cfg):
connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq']['host']))
channel = connection.channel()
exchange = cfg['rabbitmq']['exchange']
# the queue this program will consume messages from:
docs_to_enrich_qn = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_3_suffix']
# this program will generate tasks for itself and/or its siblings:
docs_to_enrich_rk = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_3_suffix']
doc_pages_to_store_in_mongo_qn = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_4_suffix']
doc_pages_to_store_in_mongo_rk = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_4_suffix']
doc_pages_to_process_qn = cfg['session']['id'] + '_' + cfg['rabbitmq']['queue_name_5_suffix']
doc_pages_to_process_rk = cfg['session']['id'] + '_' + cfg['rabbitmq']['routing_key_5_suffix']
channel.queue_declare(queue=doc_pages_to_store_in_mongo_qn, durable=True)
channel.queue_bind(exchange=exchange, queue=doc_pages_to_store_in_mongo_qn, routing_key=doc_pages_to_store_in_mongo_rk)
channel.queue_declare(queue=doc_pages_to_process_qn, durable=True)
channel.queue_bind(exchange=exchange, queue=doc_pages_to_process_qn, routing_key=doc_pages_to_process_rk)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(lambda ch, method, properties, body:
enrich_docs(ch, method, properties, body,
exchange=exchange,
doc_pages_to_store_in_mongo_rk=doc_pages_to_store_in_mongo_rk,
docs_to_enrich_rk=docs_to_enrich_rk,
doc_pages_to_process_rk=doc_pages_to_process_rk,
features_per_page=cfg['wfs']['features_per_page']),
queue=docs_to_enrich_qn)#, no_ack=True)
channel.start_consuming()
connection.close()
if __name__ == '__main__':
import yaml
import time
import signal
signal.signal(signal.SIGINT, exit_gracefully)
with open("config.yaml", 'r') as yamlfile:
cfg = yaml.load(yamlfile)
while True:
try:
main(cfg)
except pika.exceptions.ChannelClosed:
logging.info('Waiting for tasks...')
time.sleep(5)
except Exception as e:
logging.info(e)
exit(1)