Commit 0c1c2a49 authored by Damien DESPRES's avatar Damien DESPRES
Browse files

Merge branch 'development' into 'master'

merge Development into master

See merge request !15
parents 621d514c b0436110
Pipeline #18608 passed with stages
in 48 seconds
......@@ -48,7 +48,7 @@ def protocol_to_formats_and_services( links ):
output[k]['formats'] = ['KML']
output[k]['service'] = 'KML'
elif link['protocol'] == 'WS':
output[k]['formats'] = ['JSON', 'ShapeFile']
output[k]['formats'] = ['JSON', 'ShapeFile', 'CSV']
output[k]['service'] = 'WS'
elif link['protocol'] == 'SOS':
output[k]['formats'] = ['JSON', 'XML']
......
......@@ -138,7 +138,7 @@ def enrich_docs( channel, method, properties, body ):
channel.exchange_declare(exchange=exchange, exchange_type='direct')
channel.queue_declare(queue=doc_pages_to_process_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']})
dest_queue = channel.queue_declare(queue=doc_pages_to_process_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']})
channel.queue_bind(exchange=exchange, queue=doc_pages_to_process_qn, routing_key=doc_pages_to_process_rk)
......@@ -178,7 +178,16 @@ def enrich_docs( channel, method, properties, body ):
msg['body'] = doc_page
the_body = msgpack.packb(msg, use_bin_type=True, default=encode_datetime)
dest_queue = channel.queue_declare(queue=doc_pages_to_process_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']}, passive=True)
q_len = dest_queue.method.message_count
logging.info('Queue Size is %i for %s...' % ( q_len, doc_pages_to_process_qn))
while q_len > 50:
logging.info('Waiting for next queue to process elements...')
time.sleep(5)
dest_queue = channel.queue_declare(queue=doc_pages_to_process_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']}, passive=True)
q_len = dest_queue.method.message_count
logging.info('Queue Size is %i for %s...' % ( q_len, doc_pages_to_process_qn))
channel.basic_publish( exchange=exchange,
routing_key=doc_pages_to_process_rk,
body=the_body,
......
......@@ -24,7 +24,7 @@ def tag_doc( the_doc ):
# tag_dict[tag] = False
# isOpen?
if 'license' in the_doc['metadata-fr'].keys() and not any( [x in the_doc['metadata-fr']['license'] for x in ["Licence de réutilisation des données d'intérêt général"] ] ):
if 'license' in the_doc['metadata-fr'].keys() and not any( [x in the_doc['metadata-fr']['license'] for x in ["Licence de réutilisation des données d'intérêt général","Accord de réutilisation de données en accès privé et exclusif"] ] ):
tag_dict['isOpenAccess'] = True
else:
tag_dict['isOpenAccess'] = False
......@@ -115,7 +115,7 @@ def index_docs(channel, method, properties, body):
rep = es.bulk(body=es_body)
t2 = time.time()
t2 = time.time()
if rep['errors'] == False:
channel.basic_ack(delivery_tag = method.delivery_tag)
......
......@@ -140,7 +140,7 @@ def process_docs( channel, method, properties, body ):
del cfg['rabbitmq']['routing_key_2']
channel.exchange_declare(exchange=exchange, exchange_type='direct')
channel.queue_declare(queue=docs_to_index_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']})
dest_queue = channel.queue_declare(queue=docs_to_index_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']})
channel.queue_bind(exchange=exchange, queue=docs_to_index_qn, routing_key=docs_to_index_rk)
logging.info('[%6.2f%%] Processing %i docs for dataset %s...' % (progress_ratio*100, len(docs), docs[0]['slug']))
......@@ -166,8 +166,16 @@ def process_docs( channel, method, properties, body ):
the_body = msgpack.packb(msg, use_bin_type=True)
dest_queue = channel.queue_declare(queue=docs_to_index_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']}, passive=True)
q_len = dest_queue.method.message_count
logging.info('Queue Size is %i for %s...' % ( q_len, docs_to_index_qn))
while q_len > 50:
logging.info('Waiting for next queue to process elements...')
time.sleep(5)
dest_queue = channel.queue_declare(queue=docs_to_index_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']}, passive=True)
q_len = dest_queue.method.message_count
logging.info('Queue Size is %i for %s...' % ( q_len, docs_to_index_qn))
channel.basic_publish( exchange=exchange,
routing_key=docs_to_index_rk,
body=the_body,
......
......@@ -65,16 +65,19 @@ def list_to_dictlist( the_input, the_context=None ):
out_item = {}
for k, line in enumerate(in_item_split):
if line != "":
if the_context != None:
out_item[ dictionary[the_context][k] ] = line
if the_context and k in dictionary[the_context].keys():
if line != "":
if the_context and dictionary[the_context].get(k):
out_item[ dictionary[the_context][k] ] = line
else:
out_item[ k ] = line
else:
out_item[ k ] = line
else:
out_item[ dictionary[the_context][k] ] = 'null'
if dictionary[the_context].get(k):
out_item[ dictionary[the_context][k] ] = 'null'
logging.debug(the_context)
logging.debug(out_item)
logging.debug("context : %s", the_context)
logging.debug("input: %s", in_item)
logging.debug("result : %s", out_item)
# appending a hash value of the item can be useful at client-side,
# as it allows one to identify entities that are common to multiple datasets...
......@@ -184,7 +187,7 @@ def process_record( in_record, working_directory, credentials ):
out_record['metadata-fr']['license'] = 'unknown'
if 'legalConstraints' in out_record['metadata-fr'].keys():
for el in out_record['metadata-fr']['legalConstraints']:
if "licence" in el.lower():
if "licence" in el.lower() or "accord" in el.lower():
out_record['metadata-fr']['license'] = el
if 'resourceConstraints' in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['resourceConstraints']) is str:
......@@ -420,6 +423,6 @@ if __name__ == '__main__':
logging.info('Waiting for RabbitMQ to be reachable...')
time.sleep(5)
except Exception as e:
logging.error(e)
logging.exception("General error (Aborting): %s", e)
time.sleep(5)
exit(1)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment