Commit 09d5621d authored by Sébastien DA ROCHA's avatar Sébastien DA ROCHA
Browse files

Add type to properties names and remove old code

parent 5e4eee13
Pipeline #8485 passed with stage
in 10 seconds
......@@ -128,3 +128,6 @@ dmypy.json
.pyre/
# vim
*.sw[op]
# docker-compose
docker-compose.override.yml
......@@ -174,6 +174,7 @@ def enrich_docs( channel, method, properties, body ):
msg['header']['progress_ratio'] = progress_ratio
msg['header']['count'] = count
msg['header']['serialized_deferred_count'] = serialized_deferred_count
msg['header']['wfs_info'] = wfs_info
msg['body'] = doc_page
the_body = msgpack.packb(msg, use_bin_type=True, default=encode_datetime)
......
......@@ -61,22 +61,14 @@ def list_to_dictlist( the_input, the_context=None ):
the_output = []
for in_item in the_input:
#print(i, link)
in_item_split = in_item.split('|')
out_item = {}
#print(in_item_split)
for k, line in enumerate(in_item_split):
if line != "":
# print(the_context, dictionary.keys())
# # out_item[ dictionary[the_context][k] ] = line
if the_context != None:
# logging.debug('!'*80)
# logging.debug( dictionary[the_context][k] )
# logging.debug('x')
out_item[ dictionary[the_context][k] ] = line
else:
# logging.debug('?'*80)
out_item[ k ] = line
else:
out_item[ dictionary[the_context][k] ] = 'null'
......@@ -99,14 +91,6 @@ def list_to_dictlist( the_input, the_context=None ):
# In the following, we arrange things differently...
if the_context == 'responsibleParty':
# try:
# # the following applies to legacy metadata
# parent_organisation, child_organisation = out_item['organisationName'].split('/')
# parent_organisation = parent_organisation.strip()
# child_organisation = child_organisation.strip()
# except:
# pass
try:
# the following applies to Dublin Core metadata
my_re = re.compile(r"(?P<organisationName>[^\(\)]+)(\((?P<individualName>.*)\))")
......@@ -172,8 +156,6 @@ def process_record( in_record, working_directory, credentials ):
if 'link' in out_record['metadata-fr'].keys():
#logging.debug(in_record['link'])
#exit(1)
del out_record['metadata-fr']['link']
tmp = list_to_dictlist(in_record['link'], 'link')#links
out_record['metadata-fr']['link'] = enrich_links( fix_links(tmp, credentials), credentials, working_directory )
......@@ -261,14 +243,8 @@ def process_record( in_record, working_directory, credentials ):
if 'updateFrequency' in in_record.keys() and type(in_record['updateFrequency']) is list:
out_record['metadata-fr']['updateFrequency'] = in_record['updateFrequency'][0]
#pprint(out_record)
#out_records.append(out_record)
#print('-'*80)
return out_record
#return out_records
def callback( channel, method, properties, body ):
......@@ -317,7 +293,6 @@ def callback( channel, method, properties, body ):
the_full_title = out_record['metadata-fr']['title']
the_slug = generate_slug(the_full_title)
logging.info('Slug for "%s": %s' % (the_full_title, the_slug))
# the_type = out_record['metadata-fr']['type']
last_update = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
# let's look for a WFS ressource to potentially fetch and index...
......@@ -330,14 +305,10 @@ def callback( channel, method, properties, body ):
if 'service' in link.keys() and link['service'] == 'WFS' and 'data.grandlyon.com' in link['url']:
logging.debug('EUREKA : found a WFS ressource!')
wfs_found = True
#documents_to_index = []
# OLD featurePages = getWFS(metadata_record['metadata-fr']['link'][key])
#featurePages = getWFS(link) #
full_version = out_record.copy() # including metadata AND data
full_version['uuid'] = out_record['metadata-fr']['geonet:info']['uuid'] + '.full'
full_version['type'] = out_record['metadata-fr']['type']
full_version['slug'] = the_slug
# full_version['last_update'] will be added by the doc enricher
# adding information about fields and types ---------------------------------------------------------------------------
dbname = link['url'].split('/')[-1]
......@@ -345,14 +316,12 @@ def callback( channel, method, properties, body ):
if lookup_key in field_catalog.keys():
full_version['fields'] = { "list": field_catalog[lookup_key]['fields'], "types": field_catalog[lookup_key]['types'] }
else:
logging.warning("No information about the needed fields and types found in the field catalog: generating a new catalog...")
logging.warning("No information about the needed fields and types found (%s) in the field catalog: generating a new catalog...", lookup_key)
field_type_detector(cfg)
channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
return
# ---------------------------------------------------------------------------------------------------------------------
#create_reindex_task(cfg, channel, full_version['uuid'])
msg = dict()
msg['header'] = dict()
msg['header']['cfg'] = cfg
......@@ -377,8 +346,6 @@ def callback( channel, method, properties, body ):
meta_version['slug'] = the_slug
meta_version['last_update'] = last_update
#create_reindex_task(cfg, channel, meta_version['uuid'])
msg = dict()
msg['header'] = dict()
msg['header']['cfg'] = cfg
......@@ -399,64 +366,17 @@ def callback( channel, method, properties, body ):
properties=pika.BasicProperties(delivery_mode = 2) )
channel.basic_ack(delivery_tag = method.delivery_tag)
#channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
return
# def create_reindex_task( the_cfg, the_channel, the_uuid ):
#
# the_cfg = deepcopy(the_cfg)
#
# #connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port']))
# #channel = connection.channel()
# exchange = the_cfg['rabbitmq']['exchange']
# queue_name = the_cfg['rabbitmq']['queue_name_4']
# routing_key = the_cfg['rabbitmq']['routing_key_4']
#
# del the_cfg['rabbitmq']['queue_name_4']
# del the_cfg['rabbitmq']['routing_key_4']
#
# the_channel.exchange_declare(exchange=exchange, exchange_type='direct')
# the_channel.queue_declare(queue=queue_name, durable=True)
# the_channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key)
#
# msg = dict()
#
# msg['header'] = dict()
# msg['header']['cfg'] = the_cfg
# msg['body'] = the_uuid
#
# the_body = msgpack.packb(msg, use_bin_type=False)
#
# the_channel.basic_publish( exchange=exchange,
# routing_key=routing_key,
# body=the_body,
# properties=pika.BasicProperties(delivery_mode = 2)
# )
#
# #connection.close()
#
# return
def main(cfg):
#from lib.close_connection import on_timeout
# logging.debug(cfg)
#global connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'], port=cfg['rabbitmq_port']))
# timeout = 5
# connection.add_timeout(timeout, on_timeout(connection))
channel = connection.channel()
# exchange = cfg['rabbitmq_exchange']
# the queue this program will consume messages from:
metadata_records_to_process_qn = cfg['rabbitmq_queue']
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_message_callback=lambda ch, method, properties, body: callback(ch, method, properties, body),
queue=metadata_records_to_process_qn)#, no_ack=True)
queue=metadata_records_to_process_qn)
channel.start_consuming()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment