diff --git a/lib/fix_links.py b/lib/fix_links.py index 95b71c571111761824e266a0b3576332fa8e6ded..68d1ee82fb70d765584f069eb55afa36b769a6b9 100644 --- a/lib/fix_links.py +++ b/lib/fix_links.py @@ -48,7 +48,7 @@ def protocol_to_formats_and_services( links ): output[k]['formats'] = ['KML'] output[k]['service'] = 'KML' elif link['protocol'] == 'WS': - output[k]['formats'] = ['JSON', 'ShapeFile'] + output[k]['formats'] = ['JSON', 'ShapeFile', 'CSV'] output[k]['service'] = 'WS' elif link['protocol'] == 'SOS': output[k]['formats'] = ['JSON', 'XML'] diff --git a/workers/doc_enricher.py b/workers/doc_enricher.py index 1d36785247ff8ed2d0ed1cc56a3213b22c99d1d8..fc9c7ee4ce79d2d2a4e62fcc7d4792c06cddc4e5 100644 --- a/workers/doc_enricher.py +++ b/workers/doc_enricher.py @@ -138,7 +138,7 @@ def enrich_docs( channel, method, properties, body ): channel.exchange_declare(exchange=exchange, exchange_type='direct') - channel.queue_declare(queue=doc_pages_to_process_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']}) + dest_queue = channel.queue_declare(queue=doc_pages_to_process_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']}) channel.queue_bind(exchange=exchange, queue=doc_pages_to_process_qn, routing_key=doc_pages_to_process_rk) @@ -178,7 +178,16 @@ def enrich_docs( channel, method, properties, body ): msg['body'] = doc_page the_body = msgpack.packb(msg, use_bin_type=True, default=encode_datetime) - + dest_queue = channel.queue_declare(queue=doc_pages_to_process_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']}, passive=True) + q_len = dest_queue.method.message_count + logging.info('Queue Size is %i for %s...' % ( q_len, doc_pages_to_process_qn)) + while q_len > 50: + logging.info('Waiting for next queue to process elements...') + time.sleep(5) + dest_queue = channel.queue_declare(queue=doc_pages_to_process_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']}, passive=True) + q_len = dest_queue.method.message_count + logging.info('Queue Size is %i for %s...' % ( q_len, doc_pages_to_process_qn)) + channel.basic_publish( exchange=exchange, routing_key=doc_pages_to_process_rk, body=the_body, diff --git a/workers/doc_indexer.py b/workers/doc_indexer.py index 1c202508e74ac884e646af8d1e48ce01f17a0e26..cdc13be4d7cb33672a0bce9b9ffeb82ee5281aa8 100644 --- a/workers/doc_indexer.py +++ b/workers/doc_indexer.py @@ -24,7 +24,7 @@ def tag_doc( the_doc ): # tag_dict[tag] = False # isOpen? - if 'license' in the_doc['metadata-fr'].keys() and not any( [x in the_doc['metadata-fr']['license'] for x in ["Licence de réutilisation des données d'intérêt général"] ] ): + if 'license' in the_doc['metadata-fr'].keys() and not any( [x in the_doc['metadata-fr']['license'] for x in ["Licence de réutilisation des données d'intérêt général","Accord de réutilisation de données en accès privé et exclusif"] ] ): tag_dict['isOpenAccess'] = True else: tag_dict['isOpenAccess'] = False @@ -115,7 +115,7 @@ def index_docs(channel, method, properties, body): rep = es.bulk(body=es_body) - t2 = time.time() + t2 = time.time() if rep['errors'] == False: channel.basic_ack(delivery_tag = method.delivery_tag) diff --git a/workers/doc_processor.py b/workers/doc_processor.py index 0a0ea1f4ea1bf29f8b2e45da50b7cef7dc61c754..bffb7ab879dc70bff586b58ed84660092699dfc7 100644 --- a/workers/doc_processor.py +++ b/workers/doc_processor.py @@ -140,7 +140,7 @@ def process_docs( channel, method, properties, body ): del cfg['rabbitmq']['routing_key_2'] channel.exchange_declare(exchange=exchange, exchange_type='direct') - channel.queue_declare(queue=docs_to_index_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']}) + dest_queue = channel.queue_declare(queue=docs_to_index_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']}) channel.queue_bind(exchange=exchange, queue=docs_to_index_qn, routing_key=docs_to_index_rk) logging.info('[%6.2f%%] Processing %i docs for dataset %s...' % (progress_ratio*100, len(docs), docs[0]['slug'])) @@ -166,8 +166,16 @@ def process_docs( channel, method, properties, body ): the_body = msgpack.packb(msg, use_bin_type=True) - - + dest_queue = channel.queue_declare(queue=docs_to_index_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']}, passive=True) + q_len = dest_queue.method.message_count + logging.info('Queue Size is %i for %s...' % ( q_len, docs_to_index_qn)) + while q_len > 50: + logging.info('Waiting for next queue to process elements...') + time.sleep(5) + dest_queue = channel.queue_declare(queue=docs_to_index_qn, durable=True, arguments={'x-message-ttl' : cfg['rabbitmq']['ttl']}, passive=True) + q_len = dest_queue.method.message_count + logging.info('Queue Size is %i for %s...' % ( q_len, docs_to_index_qn)) + channel.basic_publish( exchange=exchange, routing_key=docs_to_index_rk, body=the_body, diff --git a/workers/metadata_processor.py b/workers/metadata_processor.py index 9cf4415ba7193756c959e96ca837269cee5db4d3..839dd2242d8de1ba848c6919a590f48e338055fd 100644 --- a/workers/metadata_processor.py +++ b/workers/metadata_processor.py @@ -65,16 +65,19 @@ def list_to_dictlist( the_input, the_context=None ): out_item = {} for k, line in enumerate(in_item_split): - if line != "": - if the_context != None: - out_item[ dictionary[the_context][k] ] = line + if the_context and k in dictionary[the_context].keys(): + if line != "": + if the_context and dictionary[the_context].get(k): + out_item[ dictionary[the_context][k] ] = line + else: + out_item[ k ] = line else: - out_item[ k ] = line - else: - out_item[ dictionary[the_context][k] ] = 'null' + if dictionary[the_context].get(k): + out_item[ dictionary[the_context][k] ] = 'null' - logging.debug(the_context) - logging.debug(out_item) + logging.debug("context : %s", the_context) + logging.debug("input: %s", in_item) + logging.debug("result : %s", out_item) # appending a hash value of the item can be useful at client-side, # as it allows one to identify entities that are common to multiple datasets... @@ -184,7 +187,7 @@ def process_record( in_record, working_directory, credentials ): out_record['metadata-fr']['license'] = 'unknown' if 'legalConstraints' in out_record['metadata-fr'].keys(): for el in out_record['metadata-fr']['legalConstraints']: - if "licence" in el.lower(): + if "licence" in el.lower() or "accord" in el.lower(): out_record['metadata-fr']['license'] = el if 'resourceConstraints' in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['resourceConstraints']) is str: @@ -420,6 +423,6 @@ if __name__ == '__main__': logging.info('Waiting for RabbitMQ to be reachable...') time.sleep(5) except Exception as e: - logging.error(e) + logging.exception("General error (Aborting): %s", e) time.sleep(5) exit(1)