diff --git a/lib/rabbit_session.py b/lib/rabbit_session.py index 17f924f8f64c858fd0c6f554702ebabd839aedfc..65c8fcd47a8bb74d9cfa899952cc905c4460910d 100644 --- a/lib/rabbit_session.py +++ b/lib/rabbit_session.py @@ -25,6 +25,8 @@ class RabbitSession: def __enter__(self): self.connection = self._create_connection() + # timeout = 5 + # connection.add_timeout(timeout, on_timeout(connection)) return self def __exit__(self, *args): diff --git a/workers/metadata-processor.py b/workers/metadata-processor.py index ec566d498b6e1c2146b7d548588ad48dd1b7fce2..4528b2355d05833a70e5a6a5a51e470199d62a54 100644 --- a/workers/metadata-processor.py +++ b/workers/metadata-processor.py @@ -20,486 +20,520 @@ from lib.my_logging import logging from lib.fix_links import fix_links from lib.enrich_links import enrich_links from lib.generate_slug import generate_slug - -from lib.rabbit_session import RabbitSession from lib.log_message import LogMessage -class MetadataProcessor: - - def __init__(self, cfg): - self.cfg = cfg - self.rabbit = None - - def list_to_dictlist( self, the_input, the_context=None ): - """ - This function transforms some of the lists (responsibleParty, link, ...) returned by GeoNetwork's q API into a list of dicts. - """ - - dictionary = dict() - - dictionary['responsibleParty'] = { 0: "role", - 1: "appliesTo", - 2: "organisationName", - 3: "logo", - 4: "email", - 5: "individualName", - 6: "positionName", - 7: "address", - 8: "telephone" } - - - dictionary['link'] = { 0: "name", - 1: "description", - 2: "url", - 3: "protocol", - 4: "content-type", - 5: "unknown" } - - dictionary['image'] = {0: "type", 1: "url"} - - dictionary['inspirethemewithac'] = {0: "id", 1: "label"} - - dictionary['geoBox'] = {0: "westBoundLongitude", 1: "southBoundLatitude", 2: "eastBoundLongitude", 3: "northBoundLatitude"} - - # in case the input is not already a list, we convert it to a list - if type( the_input ) is str: - the_input = [the_input] - - the_output = [] - for in_item in the_input: - #print(i, link) - in_item_split = in_item.split('|') - - out_item = {} - #print(in_item_split) - for k, line in enumerate(in_item_split): - if line != "": - # print(the_context, dictionary.keys()) - # # out_item[ dictionary[the_context][k] ] = line - if the_context != None: - # logging.debug('!'*80) - # logging.debug( dictionary[the_context][k] ) - # logging.debug('x') - out_item[ dictionary[the_context][k] ] = line - else: - # logging.debug('?'*80) - out_item[ k ] = line +def list_to_dictlist( the_input, the_context=None ): + """ + This function transforms some of the lists (responsibleParty, link, ...) returned by GeoNetwork's q API into a list of dicts. + """ + + dictionary = dict() + + dictionary['responsibleParty'] = { 0: "role", + 1: "appliesTo", + 2: "organisationName", + 3: "logo", + 4: "email", + 5: "individualName", + 6: "positionName", + 7: "address", + 8: "telephone" } + + + dictionary['link'] = { 0: "name", + 1: "description", + 2: "url", + 3: "protocol", + 4: "content-type", + 5: "unknown" } + + dictionary['image'] = {0: "type", 1: "url"} + + dictionary['inspirethemewithac'] = {0: "id", 1: "label"} + + dictionary['geoBox'] = {0: "westBoundLongitude", 1: "southBoundLatitude", 2: "eastBoundLongitude", 3: "northBoundLatitude"} + + # in case the input is not already a list, we convert it to a list + if type( the_input ) is str: + the_input = [the_input] + + the_output = [] + for in_item in the_input: + #print(i, link) + in_item_split = in_item.split('|') + + out_item = {} + #print(in_item_split) + for k, line in enumerate(in_item_split): + if line != "": + # print(the_context, dictionary.keys()) + # # out_item[ dictionary[the_context][k] ] = line + if the_context != None: + # logging.debug('!'*80) + # logging.debug( dictionary[the_context][k] ) + # logging.debug('x') + out_item[ dictionary[the_context][k] ] = line else: - out_item[ dictionary[the_context][k] ] = 'null' + # logging.debug('?'*80) + out_item[ k ] = line + else: + out_item[ dictionary[the_context][k] ] = 'null' - logging.debug(the_context) - logging.debug(out_item) + logging.debug(the_context) + logging.debug(out_item) - # appending a hash value of the item can be useful at client-side, - # as it allows one to identify entities that are common to multiple datasets... - if the_context == 'responsibleParty': - tmp = out_item.copy() - del tmp['appliesTo'] # as a matter of facts, 'appliesTo' isn't really a property of the responsibleParty - md5 = hashlib.md5( json.dumps(tmp, sort_keys=True).encode("utf-8") ).hexdigest() - else: - md5 = hashlib.md5( json.dumps(out_item, sort_keys=True).encode("utf-8") ).hexdigest() + # appending a hash value of the item can be useful at client-side, + # as it allows one to identify entities that are common to multiple datasets... + if the_context == 'responsibleParty': + tmp = out_item.copy() + del tmp['appliesTo'] # as a matter of facts, 'appliesTo' isn't really a property of the responsibleParty + md5 = hashlib.md5( json.dumps(tmp, sort_keys=True).encode("utf-8") ).hexdigest() + else: + md5 = hashlib.md5( json.dumps(out_item, sort_keys=True).encode("utf-8") ).hexdigest() + + # In the case of dataset produced by the Métropole de Lyon, + # the "organisationName" contains "Métropole de Lyon" + # followed by the name of the direction. + # In the following, we arrange things differently... + if the_context == 'responsibleParty': + + # try: + # # the following applies to legacy metadata + # parent_organisation, child_organisation = out_item['organisationName'].split('/') + # parent_organisation = parent_organisation.strip() + # child_organisation = child_organisation.strip() + # except: + # pass - # In the case of dataset produced by the Métropole de Lyon, - # the "organisationName" contains "Métropole de Lyon" - # followed by the name of the direction. - # In the following, we arrange things differently... - if the_context == 'responsibleParty': + try: + # the following applies to Dublin Core metadata + my_re = re.compile(r"(?P<organisationName>[^\(\)]+)(\((?P<individualName>.*)\))") + parent_organisation = my_re.match(out_item['organisationName']).groupdict()['organisationName'].strip() + child_organisation = my_re.match(out_item['organisationName']).groupdict()['individualName'].strip() + except: + parent_organisation, child_organisation = out_item['organisationName'], None + parent_organisation = parent_organisation.strip() + child_organisation = None + + # temporary patch for ATMO + if 'Atmo Auvergne-Rhône-Alpes' in parent_organisation: + parent_organisation = parent_organisation.replace('Atmo Auvergne-Rhône-Alpes', 'ATMO Auvergne-Rhône-Alpes') + if child_organisation != None and 'Atmo Auvergne-Rhône-Alpes' in child_organisation: + child_organisation = child_organisation.replace('Atmo Auvergne-Rhône-Alpes', 'ATMO Auvergne-Rhône-Alpes') + # ------------------------------------------------------------------ + + out_item['organisationName'] = parent_organisation + + if child_organisation != None: + if 'individualName' in out_item.keys(): + out_item['individualName'] = child_organisation + ' / ' + out_item['individualName'] + else: + out_item['individualName'] = child_organisation - # try: - # # the following applies to legacy metadata - # parent_organisation, child_organisation = out_item['organisationName'].split('/') - # parent_organisation = parent_organisation.strip() - # child_organisation = child_organisation.strip() - # except: - # pass - - try: - # the following applies to Dublin Core metadata - my_re = re.compile(r"(?P<organisationName>[^\(\)]+)(\((?P<individualName>.*)\))") - parent_organisation = my_re.match(out_item['organisationName']).groupdict()['organisationName'].strip() - child_organisation = my_re.match(out_item['organisationName']).groupdict()['individualName'].strip() - except: - parent_organisation, child_organisation = out_item['organisationName'], None - parent_organisation = parent_organisation.strip() - child_organisation = None - - # temporary patch for ATMO - if 'Atmo Auvergne-Rhône-Alpes' in parent_organisation: - parent_organisation = parent_organisation.replace('Atmo Auvergne-Rhône-Alpes', 'ATMO Auvergne-Rhône-Alpes') - if child_organisation != None and 'Atmo Auvergne-Rhône-Alpes' in child_organisation: - child_organisation = child_organisation.replace('Atmo Auvergne-Rhône-Alpes', 'ATMO Auvergne-Rhône-Alpes') - # ------------------------------------------------------------------ - - out_item['organisationName'] = parent_organisation - - if child_organisation != None: - if 'individualName' in out_item.keys(): - out_item['individualName'] = child_organisation + ' / ' + out_item['individualName'] - else: - out_item['individualName'] = child_organisation - - - the_output.append( {'md5': md5, **out_item}) - - if the_context == 'geoBox': - - polygon = [] - polygon.append([float(the_output[0]['westBoundLongitude']), float(the_output[0]['southBoundLatitude'])]) - polygon.append([float(the_output[0]['eastBoundLongitude']), float(the_output[0]['southBoundLatitude'])]) - polygon.append([float(the_output[0]['eastBoundLongitude']), float(the_output[0]['northBoundLatitude'])]) - polygon.append([float(the_output[0]['westBoundLongitude']), float(the_output[0]['northBoundLatitude'])]) - polygon.append([float(the_output[0]['westBoundLongitude']), float(the_output[0]['southBoundLatitude'])]) - - the_output = {'type': 'Polygon', 'coordinates': [polygon]} - - return the_output - - - def process_record( self, in_record, working_directory, credentials ): - - - the_uuid = in_record['geonet:info']['uuid'] - logging.info("Processing record %s..." % the_uuid) - - out_record = {} - # all the content of the original record in "mounted" at "metadata-fr" - out_record['metadata-fr'] = in_record.copy() - - if '_locale' in out_record['metadata-fr'].keys(): - del out_record['metadata-fr']['_locale'] - out_record['metadata-fr']['locale'] = in_record['_locale'] - - # we transform some lists into list of dictionaries... - if 'responsibleParty' in out_record['metadata-fr'].keys(): - del out_record['metadata-fr']['responsibleParty'] - tmp = self.list_to_dictlist(in_record['responsibleParty'], 'responsibleParty') - # remove the entry for which appliesTo = 'metadata' - out_record['metadata-fr']['responsibleParty'] = [x for x in tmp if x['appliesTo'] == 'resource'] - - - if 'link' in out_record['metadata-fr'].keys(): - #logging.debug(in_record['link']) - #exit(1) - del out_record['metadata-fr']['link'] - tmp = self.list_to_dictlist(in_record['link'], 'link')#links - out_record['metadata-fr']['link'] = enrich_links( fix_links(tmp, credentials), working_directory ) - - if 'userinfo' in out_record['metadata-fr'].keys(): - del out_record['metadata-fr']['userinfo'] - out_record['metadata-fr']['userinfo'] = self.list_to_dictlist(in_record['userinfo'])#links - - if 'category' in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['category']) is str: - del out_record['metadata-fr']['category'] - out_record['metadata-fr']['category'] = [in_record['category']] - - if 'topicCat'in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['topicCat']) is str: - del out_record['metadata-fr']['topicCat'] - out_record['metadata-fr']['topicCat'] = [in_record['topicCat']] - - if 'keyword' in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['keyword']) is str: - del out_record['metadata-fr']['keyword'] - out_record['metadata-fr']['keyword'] = [in_record['keyword']] - - if 'legalConstraints' in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['legalConstraints']) is str: - del out_record['metadata-fr']['legalConstraints'] - out_record['metadata-fr']['legalConstraints'] = [in_record['legalConstraints']] - # adding a 'license' field - out_record['metadata-fr']['license'] = 'unknown' - if 'legalConstraints' in out_record['metadata-fr'].keys(): - for el in out_record['metadata-fr']['legalConstraints']: - if "licence" in el.lower(): - out_record['metadata-fr']['license'] = el + the_output.append( {'md5': md5, **out_item}) - if 'resourceConstraints' in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['resourceConstraints']) is str: - del out_record['metadata-fr']['resourceConstraints'] - out_record['metadata-fr']['resourceConstraints'] = [in_record['resourceConstraints']] + if the_context == 'geoBox': - if 'geoBox' in out_record['metadata-fr'].keys(): - del out_record['metadata-fr']['geoBox'] - out_record['metadata-fr']['bbox'] = self.list_to_dictlist(in_record['geoBox'], 'geoBox') - else: - pass # it means that we are treating a non-geographic dataset + polygon = [] + polygon.append([float(the_output[0]['westBoundLongitude']), float(the_output[0]['southBoundLatitude'])]) + polygon.append([float(the_output[0]['eastBoundLongitude']), float(the_output[0]['southBoundLatitude'])]) + polygon.append([float(the_output[0]['eastBoundLongitude']), float(the_output[0]['northBoundLatitude'])]) + polygon.append([float(the_output[0]['westBoundLongitude']), float(the_output[0]['northBoundLatitude'])]) + polygon.append([float(the_output[0]['westBoundLongitude']), float(the_output[0]['southBoundLatitude'])]) - if 'inspirethemewithac' in out_record['metadata-fr']: - del out_record['metadata-fr']['inspirethemewithac'] - out_record['metadata-fr']['inspirethemewithac'] = self.list_to_dictlist(in_record['inspirethemewithac'], 'inspirethemewithac') - else: - pass # it means that we are treating a non-geographic dataset + the_output = {'type': 'Polygon', 'coordinates': [polygon]} - if 'image' in out_record['metadata-fr'].keys(): - del out_record['metadata-fr']['image'] - out_record['metadata-fr']['image'] = self.list_to_dictlist(in_record['image'], 'image') - else: - pass # it means that we are treating a non-geographic dataset + return the_output +def process_record( in_record, working_directory, credentials ): - properties_to_convert_to_date = [] - properties_to_convert_to_date.append('creationDate') - for prop in properties_to_convert_to_date: + the_uuid = in_record['geonet:info']['uuid'] + logging.info("Processing record %s..." % the_uuid) - if prop in out_record['metadata-fr'].keys(): - try: - out_record['metadata-fr'][prop] = str(parse(in_record[prop])) - except: - #TODO logging.error('Unable to parse date in metadata: %s' % in_record[prop]) - del out_record['metadata-fr'][prop] + out_record = {} + # all the content of the original record in "mounted" at "metadata-fr" + out_record['metadata-fr'] = in_record.copy() + if '_locale' in out_record['metadata-fr'].keys(): + del out_record['metadata-fr']['_locale'] + out_record['metadata-fr']['locale'] = in_record['_locale'] - # let's delete some attributes which are very specific to GeoNetwork - attribs_to_delete = ['userinfo', 'isHarvested', 'isTemplate', 'owner', \ - 'displayOrder', 'publishedForGroup', 'valid', 'docLocale', \ - 'popularity', 'mdLanguage', 'root', 'rating', 'source', \ - 'defaultTitle', 'datasetLang', 'geoDesc', 'locale', 'logo'] + # we transform some lists into list of dictionaries... + if 'responsibleParty' in out_record['metadata-fr'].keys(): + del out_record['metadata-fr']['responsibleParty'] + tmp = list_to_dictlist(in_record['responsibleParty'], 'responsibleParty') + # remove the entry for which appliesTo = 'metadata' + out_record['metadata-fr']['responsibleParty'] = [x for x in tmp if x['appliesTo'] == 'resource'] - for attrib in attribs_to_delete: - try: - del out_record['metadata-fr'][attrib] - except: - pass - if 'idxMsg' in in_record.keys(): - del out_record['metadata-fr']['idxMsg'] + if 'link' in out_record['metadata-fr'].keys(): + #logging.debug(in_record['link']) + #exit(1) + del out_record['metadata-fr']['link'] + tmp = list_to_dictlist(in_record['link'], 'link')#links + out_record['metadata-fr']['link'] = enrich_links( fix_links(tmp, credentials), working_directory ) - # let's take just one value among those that are in the updateFrequency (hoping that it is representative...) - if 'updateFrequency' in in_record.keys() and type(in_record['updateFrequency']) is list: - out_record['metadata-fr']['updateFrequency'] = in_record['updateFrequency'][0] + if 'userinfo' in out_record['metadata-fr'].keys(): + del out_record['metadata-fr']['userinfo'] + out_record['metadata-fr']['userinfo'] = list_to_dictlist(in_record['userinfo'])#links - #pprint(out_record) - #out_records.append(out_record) - #print('-'*80) - return out_record + if 'category' in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['category']) is str: + del out_record['metadata-fr']['category'] + out_record['metadata-fr']['category'] = [in_record['category']] - #return out_records + if 'topicCat'in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['topicCat']) is str: + del out_record['metadata-fr']['topicCat'] + out_record['metadata-fr']['topicCat'] = [in_record['topicCat']] - def callback(self, channel, method, properties, body): + if 'keyword' in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['keyword']) is str: + del out_record['metadata-fr']['keyword'] + out_record['metadata-fr']['keyword'] = [in_record['keyword']] - decoded_body = msgpack.unpackb(body, raw=False) + if 'legalConstraints' in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['legalConstraints']) is str: + del out_record['metadata-fr']['legalConstraints'] + out_record['metadata-fr']['legalConstraints'] = [in_record['legalConstraints']] + # adding a 'license' field + out_record['metadata-fr']['license'] = 'unknown' + if 'legalConstraints' in out_record['metadata-fr'].keys(): + for el in out_record['metadata-fr']['legalConstraints']: + if "licence" in el.lower(): + out_record['metadata-fr']['license'] = el - # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], - # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], - step='metadata-processor', - status='Starting...', - uuid_prefix='meta', - info='no info' - ) - self.rabbit.publish_log(log_message=log_message.__dict__) - # ------------------------------------------------------------ + if 'resourceConstraints' in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['resourceConstraints']) is str: + del out_record['metadata-fr']['resourceConstraints'] + out_record['metadata-fr']['resourceConstraints'] = [in_record['resourceConstraints']] + if 'geoBox' in out_record['metadata-fr'].keys(): + del out_record['metadata-fr']['geoBox'] + out_record['metadata-fr']['bbox'] = list_to_dictlist(in_record['geoBox'], 'geoBox') + else: + pass # it means that we are treating a non-geographic dataset - cfg = decoded_body['header']['cfg'] + if 'inspirethemewithac' in out_record['metadata-fr']: + del out_record['metadata-fr']['inspirethemewithac'] + out_record['metadata-fr']['inspirethemewithac'] = list_to_dictlist(in_record['inspirethemewithac'], 'inspirethemewithac') + else: + pass # it means that we are treating a non-geographic dataset - working_directory = os.path.join(parentDir, cfg['session']['working_directory']) - credentials = cfg['credentials'] + if 'image' in out_record['metadata-fr'].keys(): + del out_record['metadata-fr']['image'] + out_record['metadata-fr']['image'] = list_to_dictlist(in_record['image'], 'image') + else: + pass # it means that we are treating a non-geographic dataset - in_record = decoded_body['body'] - - out_record = self.process_record( in_record, working_directory, credentials ) - # initialize RabbitMQ channel - docs_to_index_rk = cfg['rabbitmq']['routing_key_2'] - docs_to_index_qn = cfg['rabbitmq']['queue_name_2'] - # - docs_to_enrich_rk = cfg['rabbitmq']['routing_key_3'] - docs_to_enrich_qn = cfg['rabbitmq']['queue_name_3'] - - # the following information is no longer needed, once that this script reads them - del cfg['rabbitmq']['routing_key_3'] - del cfg['rabbitmq']['queue_name_3'] - exchange = cfg['rabbitmq']['exchange'] - # channel.exchange_declare(exchange=exchange, exchange_type='direct') - # - # channel.queue_declare(queue=docs_to_index_qn, durable=True) - # channel.queue_bind(exchange=exchange, queue=docs_to_index_qn, routing_key=docs_to_index_rk) - # channel.queue_declare(queue=docs_to_enrich_qn, durable=True) - # channel.queue_bind(exchange=exchange, queue=docs_to_enrich_qn, routing_key=docs_to_enrich_rk) + properties_to_convert_to_date = [] + properties_to_convert_to_date.append('creationDate') - # load field catalog - filename = os.path.join( parentDir, cfg['session']['working_directory'], 'field_catalog_by_dbschematable.json' ) - with open(filename, 'r') as fp: - field_catalog = json.load(fp) + for prop in properties_to_convert_to_date: + if prop in out_record['metadata-fr'].keys(): + try: + out_record['metadata-fr'][prop] = str(parse(in_record[prop])) + except: + #TODO logging.error('Unable to parse date in metadata: %s' % in_record[prop]) + del out_record['metadata-fr'][prop] + + + # let's delete some attributes which are very specific to GeoNetwork + attribs_to_delete = ['userinfo', 'isHarvested', 'isTemplate', 'owner', \ + 'displayOrder', 'publishedForGroup', 'valid', 'docLocale', \ + 'popularity', 'mdLanguage', 'root', 'rating', 'source', \ + 'defaultTitle', 'datasetLang', 'geoDesc', 'locale', 'logo'] + + for attrib in attribs_to_delete: + try: + del out_record['metadata-fr'][attrib] + except: + pass - the_full_title = out_record['metadata-fr']['title'] - the_slug = generate_slug(the_full_title) - logging.info('Slug for "%s": %s' % (the_full_title, the_slug)) - # the_type = out_record['metadata-fr']['type'] - last_update = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") + if 'idxMsg' in in_record.keys(): + del out_record['metadata-fr']['idxMsg'] - # let's look for a WFS ressource to potentially fetch and index... - wfs_found = False + # let's take just one value among those that are in the updateFrequency (hoping that it is representative...) + if 'updateFrequency' in in_record.keys() and type(in_record['updateFrequency']) is list: + out_record['metadata-fr']['updateFrequency'] = in_record['updateFrequency'][0] - if 'link' in out_record['metadata-fr'].keys(): + #pprint(out_record) + #out_records.append(out_record) + #print('-'*80) + return out_record - for link in out_record['metadata-fr']['link']: - #TODO: generalize! - if 'service' in link.keys() and link['service'] == 'WFS' and 'data.grandlyon.com' in link['url']: - logging.debug('EUREKA : found a WFS ressource!') - wfs_found = True - #documents_to_index = [] - # OLD featurePages = getWFS(metadata_record['metadata-fr']['link'][key]) - #featurePages = getWFS(link) # - full_version = out_record.copy() # including metadata AND data - full_version['uuid'] = out_record['metadata-fr']['geonet:info']['uuid'] + '.full' - full_version['type'] = out_record['metadata-fr']['type'] - full_version['slug'] = the_slug - # full_version['last_update'] will be added by the doc enricher + #return out_records - # adding information about fields and types --------------------------------------------------------------------------- - dbname = link['url'].split('/')[-1] - lookup_key = "{0}/{1}".format(dbname, link['name']) - full_version['fields'] = { "list": field_catalog[lookup_key]['fields'], "types": field_catalog[lookup_key]['types'] } - # --------------------------------------------------------------------------------------------------------------------- - #create_reindex_task(cfg, channel, full_version['uuid']) - msg = dict() - msg['header'] = dict() - msg['header']['cfg'] = cfg - msg['header']['wfs_info'] = link - msg['body'] = full_version +def callback( channel, method, properties, body ): - the_body = msgpack.packb(msg, use_bin_type=True) + decoded_body = msgpack.unpackb(body, raw=False) - # - # channel.basic_publish( exchange=exchange, - # routing_key=docs_to_enrich_rk, - # body=the_body, - # properties=pika.BasicProperties(delivery_mode = 2) ) - # ------------------------------send-task-------------------------------------- + cfg = decoded_body['header']['cfg'] - # initialize RabbitMQ queues + working_directory = os.path.join(parentDir, cfg['session']['working_directory']) + credentials = cfg['credentials'] - self.rabbit.publish_task(the_body=the_body, - exchange=exchange, - routing_key=docs_to_enrich_rk, - queue_name=docs_to_enrich_qn) + in_record = decoded_body['body'] - # ------------------------------------------------------------------------- - break + out_record = process_record( in_record, working_directory, credentials ) - if not wfs_found: + # initialize RabbitMQ channel + docs_to_index_rk = cfg['rabbitmq']['routing_key_2'] + docs_to_index_qn = cfg['rabbitmq']['queue_name_2'] + # + docs_to_enrich_rk = cfg['rabbitmq']['routing_key_3'] + docs_to_enrich_qn = cfg['rabbitmq']['queue_name_3'] + + queue_logs_name = cfg['rabbitmq']['queue_logs_name'] + routing_key_logs = cfg['rabbitmq']['routing_key_logs'] + exchange_logs_name = cfg['rabbitmq']['exchange_logs_name'] + + # the following information is no longer needed, once that this script reads them + del cfg['rabbitmq']['routing_key_3'] + del cfg['rabbitmq']['queue_name_3'] + + exchange = cfg['rabbitmq']['exchange'] + + channel.exchange_declare(exchange=exchange, exchange_type='direct') + + channel.queue_declare(queue=docs_to_index_qn, durable=True) + channel.queue_bind(exchange=exchange, queue=docs_to_index_qn, routing_key=docs_to_index_rk) + channel.queue_declare(queue=queue_logs_name, durable=True) + channel.queue_bind(exchange=exchange_logs_name, queue=queue_logs_name, routing_key=routing_key_logs) + channel.queue_declare(queue=docs_to_enrich_qn, durable=True) + channel.queue_bind(exchange=exchange, queue=docs_to_enrich_qn, routing_key=docs_to_enrich_rk) + + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='metadata-processor', + status='Starting...', + uuid_prefix='meta', + info='no info' + ) + + json_body = json.dumps(log_message) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=exchange, + routing_key=routing_key_logs, + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ + + # load field catalog + filename = os.path.join( parentDir, cfg['session']['working_directory'], 'field_catalog_by_dbschematable.json' ) + with open(filename, 'r') as fp: + field_catalog = json.load(fp) + + + the_full_title = out_record['metadata-fr']['title'] + the_slug = generate_slug(the_full_title) + logging.info('Slug for "%s": %s' % (the_full_title, the_slug)) + # the_type = out_record['metadata-fr']['type'] + last_update = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") + + # let's look for a WFS ressource to potentially fetch and index... + wfs_found = False + + if 'link' in out_record['metadata-fr'].keys(): + + for link in out_record['metadata-fr']['link']: + #TODO: generalize! + if 'service' in link.keys() and link['service'] == 'WFS' and 'data.grandlyon.com' in link['url']: + logging.debug('EUREKA : found a WFS ressource!') + wfs_found = True + #documents_to_index = [] + # OLD featurePages = getWFS(metadata_record['metadata-fr']['link'][key]) + #featurePages = getWFS(link) # + full_version = out_record.copy() # including metadata AND data + full_version['uuid'] = out_record['metadata-fr']['geonet:info']['uuid'] + '.full' + full_version['type'] = out_record['metadata-fr']['type'] + full_version['slug'] = the_slug + # full_version['last_update'] will be added by the doc enricher + + # adding information about fields and types --------------------------------------------------------------------------- + dbname = link['url'].split('/')[-1] + lookup_key = "{0}/{1}".format(dbname, link['name']) + full_version['fields'] = { "list": field_catalog[lookup_key]['fields'], "types": field_catalog[lookup_key]['types'] } + # --------------------------------------------------------------------------------------------------------------------- + + #create_reindex_task(cfg, channel, full_version['uuid']) + + msg = dict() + msg['header'] = dict() + msg['header']['cfg'] = cfg + msg['header']['wfs_info'] = link + msg['body'] = full_version + + the_body = msgpack.packb(msg, use_bin_type=True) + channel.basic_publish( exchange=exchange, + routing_key=docs_to_enrich_rk, + body=the_body, + properties=pika.BasicProperties(delivery_mode = 2) ) + + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='metadata-processor', + status='sent task doc to enrich', + uuid_prefix='meta', + info='no info' + ) + + json_body = json.dumps(log_message) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=exchange, + routing_key=routing_key_logs, + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ + + break + + + if not wfs_found: + + meta_version = out_record.copy() # including metadata ONLY + meta_version['uuid'] = out_record['metadata-fr']['geonet:info']['uuid'] + '.meta' + meta_version['type'] = out_record['metadata-fr']['type'] + meta_version['slug'] = the_slug + meta_version['last_update'] = last_update + + #create_reindex_task(cfg, channel, meta_version['uuid']) + + msg = dict() + msg['header'] = dict() + msg['header']['cfg'] = cfg + msg['header']['progress_ratio'] = 1 + msg['header']['count'] = 1 + msg['body'] = meta_version + + the_body = msgpack.packb(msg, use_bin_type=True) + + channel.basic_publish( exchange=exchange, + routing_key=docs_to_index_rk, + body=the_body, + properties=pika.BasicProperties(delivery_mode = 2) ) - meta_version = out_record.copy() # including metadata ONLY - meta_version['uuid'] = out_record['metadata-fr']['geonet:info']['uuid'] + '.meta' - meta_version['type'] = out_record['metadata-fr']['type'] - meta_version['slug'] = the_slug - meta_version['last_update'] = last_update - - #create_reindex_task(cfg, channel, meta_version['uuid']) - - msg = dict() - msg['header'] = dict() - msg['header']['cfg'] = cfg - msg['header']['progress_ratio'] = 1 - msg['header']['count'] = 1 - msg['body'] = meta_version - - the_body = msgpack.packb(msg, use_bin_type=True) - - # channel.basic_publish( exchange=exchange, - # routing_key=docs_to_index_rk, - # body=the_body, - # properties=pika.BasicProperties(delivery_mode = 2) ) - # - # channel.basic_ack(delivery_tag=method.delivery_tag) - - # ------------------------------send-task-------------------------------------- - - # initialize RabbitMQ queues - - self.rabbit.publish_task(the_body=the_body, - exchange=exchange, - routing_key=docs_to_index_rk, - queue_name=docs_to_index_qn) - - # ------------------------------------------------------------------------- - #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) # ---------------------- send log ---------------------------- - log_message = LogMessage(session_id=decoded_body['header']['cfg']['session']['id'], + log_message = LogMessage(session_id=cfg['session']['id'], # session_id=cfg['session']['id'], - uuid=decoded_body['header']['cfg']['session']['current_uuid'], + uuid=cfg['session']['current_uuid'], step='metadata-processor', - status='Terminated', + status='sent task doc to index', uuid_prefix='meta', info='no info' ) - self.rabbit.publish_log(log_message=log_message.__dict__) - # ------------------------------------------------------------ - return + json_body = json.dumps(log_message) + print(" [x] json body : ", json_body) - # def create_reindex_task( the_cfg, the_channel, the_uuid ): - # - # the_cfg = deepcopy(the_cfg) - # - # #connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port'])) - # #channel = connection.channel() - # exchange = the_cfg['rabbitmq']['exchange'] - # queue_name = the_cfg['rabbitmq']['queue_name_4'] - # routing_key = the_cfg['rabbitmq']['routing_key_4'] - # - # del the_cfg['rabbitmq']['queue_name_4'] - # del the_cfg['rabbitmq']['routing_key_4'] - # - # the_channel.exchange_declare(exchange=exchange, exchange_type='direct') - # the_channel.queue_declare(queue=queue_name, durable=True) - # the_channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key) - # - # msg = dict() - # - # msg['header'] = dict() - # msg['header']['cfg'] = the_cfg - # msg['body'] = the_uuid - # - # the_body = msgpack.packb(msg, use_bin_type=False) - # - # the_channel.basic_publish( exchange=exchange, - # routing_key=routing_key, - # body=the_body, - # properties=pika.BasicProperties(delivery_mode = 2) - # ) - # - # #connection.close() - # - # return - - def main(self): - - # --------------------------send task------------------------------------- - docs_to_enrich_qn = self.cfg['rabbitmq_queue'] - with RabbitSession(cfg=cfg) as self.rabbit: - self.rabbit.consume_queue_and_launch_specific_method(specific_method=self.callback, - specific_queue=docs_to_enrich_qn) - # ----------------------------------------------------------------------- - - # #from lib.close_connection import on_timeout - # - # # logging.debug(cfg) - # #global connection - # connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'], port=cfg['rabbitmq_port'])) - # # timeout = 5 - # # connection.add_timeout(timeout, on_timeout(connection)) - # channel = connection.channel() - # # exchange = cfg['rabbitmq_exchange'] - # # the queue this program will consume messages from: - # metadata_records_to_process_qn = cfg['rabbitmq_queue'] - # - # channel.basic_qos(prefetch_count=1) - # channel.basic_consume(on_message_callback=lambda ch, method, properties, body: callback(ch, method, properties, body), - # queue=metadata_records_to_process_qn)#, no_ack=True) - # - # channel.start_consuming() - # - # connection.close() + channel.basic_publish(exchange=exchange, + routing_key=routing_key_logs, + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ + + # ---------------------- send log ---------------------------- + log_message = LogMessage(session_id=cfg['session']['id'], + # session_id=cfg['session']['id'], + uuid=cfg['session']['current_uuid'], + step='metadata-processor', + status='terminated', + uuid_prefix='meta', + info='no info' + ) + + json_body = json.dumps(log_message) + + print(" [x] json body : ", json_body) + + channel.basic_publish(exchange=exchange, + routing_key=routing_key_logs, + body=json_body, + properties=pika.BasicProperties(delivery_mode=2) + ) + # ------------------------------------------------------------ + channel.basic_ack(delivery_tag = method.delivery_tag) + #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + + return + + +# def create_reindex_task( the_cfg, the_channel, the_uuid ): +# +# the_cfg = deepcopy(the_cfg) +# +# #connection = pika.BlockingConnection(pika.ConnectionParameters(host=the_cfg['rabbitmq']['host'], port=the_cfg['rabbitmq']['port'])) +# #channel = connection.channel() +# exchange = the_cfg['rabbitmq']['exchange'] +# queue_name = the_cfg['rabbitmq']['queue_name_4'] +# routing_key = the_cfg['rabbitmq']['routing_key_4'] +# +# del the_cfg['rabbitmq']['queue_name_4'] +# del the_cfg['rabbitmq']['routing_key_4'] +# +# the_channel.exchange_declare(exchange=exchange, exchange_type='direct') +# the_channel.queue_declare(queue=queue_name, durable=True) +# the_channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=routing_key) +# +# msg = dict() +# +# msg['header'] = dict() +# msg['header']['cfg'] = the_cfg +# msg['body'] = the_uuid +# +# the_body = msgpack.packb(msg, use_bin_type=False) +# +# the_channel.basic_publish( exchange=exchange, +# routing_key=routing_key, +# body=the_body, +# properties=pika.BasicProperties(delivery_mode = 2) +# ) +# +# #connection.close() +# +# return + + + +def main(cfg): + + #from lib.close_connection import on_timeout + + # logging.debug(cfg) + #global connection + connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'], port=cfg['rabbitmq_port'])) + # timeout = 5 + # connection.add_timeout(timeout, on_timeout(connection)) + channel = connection.channel() + # exchange = cfg['rabbitmq_exchange'] + # the queue this program will consume messages from: + metadata_records_to_process_qn = cfg['rabbitmq_queue'] + + channel.basic_qos(prefetch_count=1) + channel.basic_consume(on_message_callback=lambda ch, method, properties, body: callback(ch, method, properties, body), + queue=metadata_records_to_process_qn)#, no_ack=True) + + channel.start_consuming() + + + connection.close() if __name__ == '__main__': @@ -518,6 +552,8 @@ if __name__ == '__main__': parser.add_argument('--queue', dest='queue', help='the RabbitMQ queue', type=str, required=True) parser.add_argument('--loglevel', dest='loglevel', help='the log level', default="INFO", type=str, choices=['INFO', 'DEBUG', 'WARN', 'CRITICAL', 'ERROR']) + parser.add_argument('--user', dest='user', help='the RabbitMQ user login', type=str, required=True) + parser.add_argument('--password', dest='password', help='the RabbitMQ user password', type=str, required=True) args = parser.parse_args() @@ -527,18 +563,15 @@ if __name__ == '__main__': cfg['rabbitmq_port'] = args.port cfg['rabbitmq_exchange'] = args.exchange cfg['rabbitmq_queue'] = args.queue - cfg['rabbitmq']['user'] = 'admin' - cfg['rabbitmq']['password'] = 'admin' - cfg['rabbitmq']['queue_logs_name'] = 'session_logs' - cfg['rabbitmq']['routing_key_logs'] = 'scripts_log_key' - cfg['rabbitmq']['exchange_logs_name'] = 'download_data_grandlyon_com_logs' + cfg['rabbitmq']['user'] = args.user + cfg['rabbitmq']['password'] = args.password logging.getLogger().setLevel(args.loglevel) logging.info('Starting...') while True: try: - MetadataProcessor(cfg=cfg).main() + main(cfg) except pika.exceptions.ChannelClosed: logging.info('Waiting for tasks...') time.sleep(5)