From 09d5621dbbe77c62d2a4d8c5eca653f7709e64ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20DA=20ROCHA?= Date: Wed, 28 Oct 2020 19:03:28 +0100 Subject: [PATCH] Add type to properties names and remove old code --- .gitignore | 3 ++ workers/doc_enricher.py | 1 + workers/metadata_processor.py | 84 +---------------------------------- 3 files changed, 6 insertions(+), 82 deletions(-) diff --git a/.gitignore b/.gitignore index 6957b4c..79e6d25 100644 --- a/.gitignore +++ b/.gitignore @@ -128,3 +128,6 @@ dmypy.json .pyre/ # vim *.sw[op] + +# docker-compose +docker-compose.override.yml diff --git a/workers/doc_enricher.py b/workers/doc_enricher.py index 55f1f77..1d36785 100644 --- a/workers/doc_enricher.py +++ b/workers/doc_enricher.py @@ -174,6 +174,7 @@ def enrich_docs( channel, method, properties, body ): msg['header']['progress_ratio'] = progress_ratio msg['header']['count'] = count msg['header']['serialized_deferred_count'] = serialized_deferred_count + msg['header']['wfs_info'] = wfs_info msg['body'] = doc_page the_body = msgpack.packb(msg, use_bin_type=True, default=encode_datetime) diff --git a/workers/metadata_processor.py b/workers/metadata_processor.py index 8eebf8c..9cf4415 100644 --- a/workers/metadata_processor.py +++ b/workers/metadata_processor.py @@ -61,22 +61,14 @@ def list_to_dictlist( the_input, the_context=None ): the_output = [] for in_item in the_input: - #print(i, link) in_item_split = in_item.split('|') out_item = {} - #print(in_item_split) for k, line in enumerate(in_item_split): if line != "": - # print(the_context, dictionary.keys()) - # # out_item[ dictionary[the_context][k] ] = line if the_context != None: - # logging.debug('!'*80) - # logging.debug( dictionary[the_context][k] ) - # logging.debug('x') out_item[ dictionary[the_context][k] ] = line else: - # logging.debug('?'*80) out_item[ k ] = line else: out_item[ dictionary[the_context][k] ] = 'null' @@ -99,14 +91,6 @@ def list_to_dictlist( the_input, the_context=None ): # In the following, we arrange things differently... if the_context == 'responsibleParty': - # try: - # # the following applies to legacy metadata - # parent_organisation, child_organisation = out_item['organisationName'].split('/') - # parent_organisation = parent_organisation.strip() - # child_organisation = child_organisation.strip() - # except: - # pass - try: # the following applies to Dublin Core metadata my_re = re.compile(r"(?P[^\(\)]+)(\((?P.*)\))") @@ -172,8 +156,6 @@ def process_record( in_record, working_directory, credentials ): if 'link' in out_record['metadata-fr'].keys(): - #logging.debug(in_record['link']) - #exit(1) del out_record['metadata-fr']['link'] tmp = list_to_dictlist(in_record['link'], 'link')#links out_record['metadata-fr']['link'] = enrich_links( fix_links(tmp, credentials), credentials, working_directory ) @@ -261,14 +243,8 @@ def process_record( in_record, working_directory, credentials ): if 'updateFrequency' in in_record.keys() and type(in_record['updateFrequency']) is list: out_record['metadata-fr']['updateFrequency'] = in_record['updateFrequency'][0] - #pprint(out_record) - #out_records.append(out_record) - #print('-'*80) return out_record - #return out_records - - def callback( channel, method, properties, body ): @@ -317,7 +293,6 @@ def callback( channel, method, properties, body ): the_full_title = out_record['metadata-fr']['title'] the_slug = generate_slug(the_full_title) logging.info('Slug for "%s": %s' % (the_full_title, the_slug)) - # the_type = out_record['metadata-fr']['type'] last_update = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") # let's look for a WFS ressource to potentially fetch and index... @@ -330,14 +305,10 @@ def callback( channel, method, properties, body ): if 'service' in link.keys() and link['service'] == 'WFS' and 'data.grandlyon.com' in link['url']: logging.debug('EUREKA : found a WFS ressource!') wfs_found = True - #documents_to_index = [] - # OLD featurePages = getWFS(metadata_record['metadata-fr']['link'][key]) - #featurePages = getWFS(link) # full_version = out_record.copy() # including metadata AND data full_version['uuid'] = out_record['metadata-fr']['geonet:info']['uuid'] + '.full' full_version['type'] = out_record['metadata-fr']['type'] full_version['slug'] = the_slug - # full_version['last_update'] will be added by the doc enricher # adding information about fields and types --------------------------------------------------------------------------- dbname = link['url'].split('/')[-1] @@ -345,14 +316,12 @@ def callback( channel, method, properties, body ): if lookup_key in field_catalog.keys(): full_version['fields'] = { "list": field_catalog[lookup_key]['fields'], "types": field_catalog[lookup_key]['types'] } else: - logging.warning("No information about the needed fields and types found in the field catalog: generating a new catalog...") + logging.warning("No information about the needed fields and types found (%s) in the field catalog: generating a new catalog...", lookup_key) field_type_detector(cfg) channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) return # --------------------------------------------------------------------------------------------------------------------- - #create_reindex_task(cfg, channel, full_version['uuid']) - msg = dict() msg['header'] = dict() msg['header']['cfg'] = cfg @@ -377,8 +346,6 @@ def callback( channel, method, properties, body ): meta_version['slug'] = the_slug meta_version['last_update'] = last_update - #create_reindex_task(cfg, channel, meta_version['uuid']) - msg = dict() msg['header'] = dict() msg['header']['cfg'] = cfg @@ -399,64 +366,17 @@ def callback( channel, method, properties, body ): properties=pika.BasicProperties(delivery_mode = 2) ) channel.basic_ack(delivery_tag = method.delivery_tag) - #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) - return - - -# def create_reindex_task( the_cfg, the_channel, the_uuid ): -# -# the_cfg = deepcopy(the_cfg) -# -# #connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port'])) -# #channel = connection.channel() -# exchange = the_cfg['rabbitmq']['exchange'] -# queue_name = the_cfg['rabbitmq']['queue_name_4'] -# routing_key = the_cfg['rabbitmq']['routing_key_4'] -# -# del the_cfg['rabbitmq']['queue_name_4'] -# del the_cfg['rabbitmq']['routing_key_4'] -# -# the_channel.exchange_declare(exchange=exchange, exchange_type='direct') -# the_channel.queue_declare(queue=queue_name, durable=True) -# the_channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key) -# -# msg = dict() -# -# msg['header'] = dict() -# msg['header']['cfg'] = the_cfg -# msg['body'] = the_uuid -# -# the_body = msgpack.packb(msg, use_bin_type=False) -# -# the_channel.basic_publish( exchange=exchange, -# routing_key=routing_key, -# body=the_body, -# properties=pika.BasicProperties(delivery_mode = 2) -# ) -# -# #connection.close() -# -# return - def main(cfg): - #from lib.close_connection import on_timeout - - # logging.debug(cfg) - #global connection connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'], port=cfg['rabbitmq_port'])) - # timeout = 5 - # connection.add_timeout(timeout, on_timeout(connection)) channel = connection.channel() - # exchange = cfg['rabbitmq_exchange'] - # the queue this program will consume messages from: metadata_records_to_process_qn = cfg['rabbitmq_queue'] channel.basic_qos(prefetch_count=1) channel.basic_consume(on_message_callback=lambda ch, method, properties, body: callback(ch, method, properties, body), - queue=metadata_records_to_process_qn)#, no_ack=True) + queue=metadata_records_to_process_qn) channel.start_consuming() -- GitLab