diff --git a/2-metadata-processor.py b/2-metadata-processor.py index 82d61d513d1e61be29a4ebdff6d989359e6069cf..a9847db696eb41f8480ae32a2713c04af050d1a1 100644 --- a/2-metadata-processor.py +++ b/2-metadata-processor.py @@ -280,6 +280,8 @@ def process_page( channel, method, properties, body, **kwargs): #featurePages = getWFS(link) # full_version = metadata_record.copy() # including metadata AND data full_version['uuid'] = metadata_record['metadata-fr']['geonet:info']['uuid'] + '.full' + full_version['type'] = metadata_record['metadata-fr']['type'] + msg = {'header': {'wfs_info': link, 'offset': 0, 'session_id': session_id, 'dest_index': dest_index}, 'body': full_version} the_body = msgpack.packb(msg, use_bin_type=True) @@ -294,6 +296,7 @@ def process_page( channel, method, properties, body, **kwargs): meta_version = metadata_record.copy() # including metadata ONLY meta_version['uuid'] = metadata_record['metadata-fr']['geonet:info']['uuid'] + '.meta' + meta_version['type'] = metadata_record['metadata-fr']['type'] msg = {'header': { "index" : { "_index" : dest_index, "_type" : "_doc" } }, 'body': meta_version} the_body = msgpack.packb(msg, use_bin_type=True) diff --git a/5-pg-field-type-detector.py b/5-pg-field-type-detector.py index 15faf2845f2aa7da3d83cc7eea653159d4d180be..a3f065a8ecb2fc1218b8c47818877bc863d3c70e 100644 --- a/5-pg-field-type-detector.py +++ b/5-pg-field-type-detector.py @@ -1,34 +1,12 @@ import sys import json import re -#import pika -#import itertools import time -# from sqlalchemy import create_engine -# from sqlalchemy.engine import reflection -# from sqlalchemy import MetaData -# from sqlalchemy import select -# from sqlalchemy import Table -# -# # Then load the Geometry type -# from geoalchemy2 import Geometry # noqa -# -# # -------------------------------------------------------------------- -# import psycopg2.extensions -# DEC2FLOAT = psycopg2.extensions.new_type( -# psycopg2.extensions.DECIMAL.values, -# 'DEC2FLOAT', -# lambda value, curs: float(value) if value is not None else None) -# psycopg2.extensions.register_type(DEC2FLOAT) -# # -------------------------------------------------------------------- from utils.flatten_utils import flatten_json, unflatten_json from utils.type_utils import detect_type, detect_type_new -#from pymongo import MongoClient from utils.exit_gracefully import exit_gracefully from utils.my_logging import logging -#from tqdm import tqdm - from utils.postgis_helper import Remote @@ -189,7 +167,7 @@ def elect_field_type( data ): return result -def generate_field_catalog( cfg, pg, catalog=None ): +def generate_field_catalog( cfg, catalog=None ): if catalog == None or catalog == dict(): output = {} @@ -198,87 +176,110 @@ def generate_field_catalog( cfg, pg, catalog=None ): else: output = catalog.copy() - # selected_schema = "sit_sitra" - # selected_table = selected_schema + ".sittourisme" - # found = False - logging.info('Getting schemas...') - schema_names = pg.get_schema_names() - logging.info('Done.') + dbnames = cfg['postgis']['databases'].keys() t1 = time.time() - #print(schema_names) - - for schema_name in schema_names: - # if schema_name != selected_schema: - # continue - # print(schema_name) - for table in pg.get_tables(schema_name): - # print(schema_name) - # if str(table) != selected_table: - # continue - # else: - # found = True - try: - count = pg.count_entries(table) - except: - count = 'unknown no. of' - #print(count) - - db_schema_table = '%s/%s' % (pg.dbname, table) - t2 = time.time() - logging.info('Analyzing table %s (%s records)...' % (db_schema_table, count)) - logging.info('| %i records were analyzed so far (%.2f records/s)' % (output['analyzed_docs'], output['analyzed_docs']/(t2-t1))) - # print(table) - # columns, geom = pg.get_columns(table) - # fields = [table.c[col.name] for col in columns] - # selected = select(fields) - # #detected = {} - # for entry in pg.engine.execute(selected): - # print(entry) - # - # exit(0) - - for doc in pg.get_entries(table): - - properties = doc['properties'] - - flattened_properties = flatten_json(properties) - # --------------------------------------------------------------------------------------------- - for k, v in flattened_properties.items(): - - # generate equivalent element kk out of k (in the sense of equivalence classes, cf. https://en.wikipedia.org/wiki/Equivalence_class) - # ex.: openinghoursspecification.XX.validThrough -> openinghoursspecification.0.validThrough - # N.B.: integers like .0, .1, .2, ... are inserted by the flatten_json function whenever a list is found... - kk = re.sub(r'\.\d+', '.0', k) - - if kk not in output['fields'].keys(): - output['fields'][kk] = {} + for dbname in dbnames: + logging.info('Analyzing database %s...' % dbname) + logging.info('Establishing a database connection...') + pg = Remote(hostname=cfg['postgis']['host'], dbname=dbname, username=cfg['postgis']['username'], password=cfg['postgis']['password']) + logging.info('Done.') + #field_catalog = generate_field_catalog( cfg, pg_connection, field_catalog ) - if db_schema_table not in output['fields'][kk].keys(): - output['fields'][kk][db_schema_table] = {'types': dict()} #'title': dataset_title, 'types': []} + if 'whitelist' in cfg['postgis']['databases'][dbname].keys(): + whitelist = cfg['postgis']['databases'][dbname]['whitelist'] + else: + whitelist = None - detected_type = detect_type(v) + #print(whitelist) - if detected_type not in output['fields'][kk][db_schema_table]['types'].keys(): - #print('was here, too') - output['fields'][kk][db_schema_table]['types'][detected_type] = 0 + # selected_schema = "sit_sitra" + # selected_table = selected_schema + ".sittourisme" + # found = False - output['fields'][kk][db_schema_table]['types'][detected_type] += 1 + logging.info('Getting schemas...') + schema_names = pg.get_schema_names() + logging.info('Done.') - output['analyzed_docs'] += 1 - #print(output['analyzed_docs']) - # useful for debugging: - if cfg['field_type_detector']['debug'] and output['analyzed_docs'] > 1000: - return output + #print(schema_names) - # if found == True: - # break - # - # if found == True: - # break + for schema_name in schema_names: + # if schema_name != selected_schema: + # continue + # print(schema_name) + for table in pg.get_tables(schema_name): + # print(schema_name) + # if str(table) != selected_table: + # continue + # else: + # found = True + if whitelist is not None: + if str(table) not in whitelist: + continue + + try: + count = pg.count_entries(table) + except: + count = 'unknown no. of' + #print(count) + + db_schema_table = '%s/%s' % (pg.dbname, table) + t2 = time.time() + logging.info('Analyzing table %s (%s records)...' % (db_schema_table, count)) + logging.info('| %i records were analyzed so far (%.2f records/s)' % (output['analyzed_docs'], output['analyzed_docs']/(t2-t1))) + # print(table) + # columns, geom = pg.get_columns(table) + # fields = [table.c[col.name] for col in columns] + # selected = select(fields) + # #detected = {} + # for entry in pg.engine.execute(selected): + # print(entry) + # + # exit(0) + + for doc in pg.get_entries(table): + + properties = doc['properties'] + + flattened_properties = flatten_json(properties) + + # --------------------------------------------------------------------------------------------- + for k, v in flattened_properties.items(): + + # generate equivalent element kk out of k (in the sense of equivalence classes, cf. https://en.wikipedia.org/wiki/Equivalence_class) + # ex.: openinghoursspecification.XX.validThrough -> openinghoursspecification.0.validThrough + # N.B.: integers like .0, .1, .2, ... are inserted by the flatten_json function whenever a list is found... + kk = re.sub(r'\.\d+', '.0', k) + + if kk not in output['fields'].keys(): + output['fields'][kk] = {} + + if db_schema_table not in output['fields'][kk].keys(): + output['fields'][kk][db_schema_table] = {'types': dict()} #'title': dataset_title, 'types': []} + + detected_type = detect_type(v) + + if detected_type not in output['fields'][kk][db_schema_table]['types'].keys(): + #print('was here, too') + output['fields'][kk][db_schema_table]['types'][detected_type] = 0 + + output['fields'][kk][db_schema_table]['types'][detected_type] += 1 + + output['analyzed_docs'] += 1 + #print(output['analyzed_docs']) + + # useful for debugging: + if cfg['field_type_detector']['debug'] and output['analyzed_docs'] > 1000: + return output + + # if found == True: + # break + # + # if found == True: + # break return output @@ -298,16 +299,17 @@ def main(cfg): raise Exception("(Some of the) output files are already present, and rewrite is disabled!") - dbnames = cfg['postgis']['databases'] - field_catalog = {} + # dbnames = cfg['postgis']['databases'] + # field_catalog = {} logging.info('Building catalog...') - for dbname in dbnames: - logging.info('Analyzing database %s...' % dbname) - logging.info('Establishing a database connection...') - pg_connection = Remote(hostname=cfg['postgis']['host'], dbname=dbname, username=cfg['postgis']['username'], password=cfg['postgis']['password']) - logging.info('Done.') - field_catalog = generate_field_catalog( cfg, pg_connection, field_catalog ) + # for dbname in dbnames: + # logging.info('Analyzing database %s...' % dbname) + # logging.info('Establishing a database connection...') + # pg_connection = Remote(hostname=cfg['postgis']['host'], dbname=dbname, username=cfg['postgis']['username'], password=cfg['postgis']['password']) + # logging.info('Done.') + #field_catalog = {} + field_catalog = generate_field_catalog( cfg )#, field_catalog ) logging.info("Catalog: built. %i docs were analyzed. " % field_catalog['analyzed_docs']) # writing results to disk diff --git a/7-doc-indexer.py b/7-doc-indexer.py index 9bd8c0d308f31f9653fc40d1270d1b0270aa3420..6bf1e2c6c743eb19f8980a1ec8ef97e8fb56b81d 100644 --- a/7-doc-indexer.py +++ b/7-doc-indexer.py @@ -87,42 +87,42 @@ def index_docs(channel, method, properties, body, es): #print(docs_to_index) - es_index = decoded_body['header']['index']['_index'] - es_body = { "settings" : { - "number_of_shards" : 1, - "number_of_replicas" : 0, - "index.mapping.total_fields.limit": 10000, - "refresh_interval": "30s" - }, - "mappings": { - "_doc": { - "dynamic_templates": [ # priority is given by order! - { - "uuid" : { - "path_match": "uuid", - "mapping": { - "type": "keyword", - } - } - }, - { - "default" : { - "path_match": "*", - "mapping": { - "enabled": "false" - } - } - } - ] - } - } - } - #es_body.update({"mappings": {"_doc": {"dynamic_date_formats": ["strict_date_optional_time"]}}}) - - try: - rep = es.indices.create(es_index, es_body)#, wait_for_active_shards=0) - except: - pass + # es_index = decoded_body['header']['index']['_index'] + # es_body = { "settings" : { + # "number_of_shards" : 1, + # "number_of_replicas" : 0, + # "index.mapping.total_fields.limit": 10000, + # "refresh_interval": "30s" + # }, + # "mappings": { + # "_doc": { + # "dynamic_templates": [ # priority is given by order! + # { + # "uuid" : { + # "path_match": "uuid", + # "mapping": { + # "type": "keyword", + # } + # } + # }, + # { + # "default" : { + # "path_match": "*", + # "mapping": { + # "enabled": "false" + # } + # } + # } + # ] + # } + # } + # } + # #es_body.update({"mappings": {"_doc": {"dynamic_date_formats": ["strict_date_optional_time"]}}}) + # + # try: + # rep = es.indices.create(es_index, es_body)#, wait_for_active_shards=0) + # except: + # pass logging.info("Pushing %i documents to Elasticsearch..." % len(docs_to_index)) diff --git a/8-reindexer.py b/8-reindexer.py index 57746973f4aa248fb9da3ab7b3ed256f5f24408e..ef988e16cec9ea54f204d12f737bf3b6d34b555e 100644 --- a/8-reindexer.py +++ b/8-reindexer.py @@ -13,6 +13,8 @@ def main(cfg, wait=True): from es_template import template template['index_patterns'] = [ cfg['reindexer']['destination_index'] ] + template['settings']['number_of_shards'] = [ cfg['reindexer']['number_of_shards'] ] + template['settings']['number_of_replicas'] = [ cfg['reindexer']['number_of_replicas'] ] if wait: @@ -49,7 +51,7 @@ def main(cfg, wait=True): #"remote": { "host": cfg['reindexer']['source_url'] }, "index": cfg['reindexer']['source_index'], "type": "_doc", - "size": 100 + "size": 1000 }, "dest": { "index": cfg['reindexer']['destination_index'], diff --git a/config.template.yaml b/config.template.yaml index 725694055c64f7c3c0118d93d8827ff9c97d47ff..781ccd783542a77a5cdd0cdf4ff5255b90ba8c2d 100644 --- a/config.template.yaml +++ b/config.template.yaml @@ -40,9 +40,13 @@ postgis: password: <the_password> host: <the_hostname> databases: - - <1st database to analyze> - - <2nd database to analyze> - - <...> + <1st database to analyze>: + <2nd database to analyze>: + whitelist: + - <1st schema.table to analyze> + - <2nd schema.table to analyze> + - <...> + <...>: metadata_getter: uuids_to_get: @@ -71,3 +75,5 @@ reindexer: destination_index: <ex.: download.data.grandlyon.com.v2> template_name: <ex.: download.data.grandlyon.com.v2> template_index_pattern: <ex.: download.data.grandlyon.com.v2> + number_of_shards: <the_number_of_shards_of_the_destination_index> + number_of_replicas: <the_number_of_replicas_of_the_destination_index> diff --git a/es_template.py b/es_template.py index fb5706fb2755a5509b27afad82106e8bd95de3fb..0fd9c28f37036f306d99557fde8352411a27f7b3 100644 --- a/es_template.py +++ b/es_template.py @@ -4,8 +4,8 @@ template = { "settings" : { "index.mapping.total_fields.limit": 10000, #"index.mapping.ignore_malformed": True, - "number_of_shards" : 1, - "number_of_replicas" : 0, + # "number_of_shards" : 48, + # "number_of_replicas" : 0, "max_ngram_diff": 100, "analysis": { "filter": { diff --git a/utils/postgis_helper.py b/utils/postgis_helper.py index 1b26ba58900bec2898e76e47a75f7d54c3ff52ac..96ecb460d24345e50096f4879250ede2822b15d2 100644 --- a/utils/postgis_helper.py +++ b/utils/postgis_helper.py @@ -69,7 +69,7 @@ class Remote(object): return [name for name in self.inspect.get_schema_names() if name != 'information_schema'] def get_tables(self, schema=None): - for table_name in self.inspect.get_table_names(schema=schema): + for table_name in self.inspect.get_table_names(schema=schema) + self.inspect.get_view_names(schema=schema): if table_name == 'spatial_ref_sys': continue yield self.get_table(table_name, schema=schema)