Commit 5d07970d authored by Sébastien DA ROCHA's avatar Sébastien DA ROCHA
Browse files

Add type to properties names and remove old code

parent 3e2e8a01
Pipeline #8314 passed with stage
in 11 seconds
......@@ -18,7 +18,7 @@ services:
metadata-processor:
build: .
image: registry.forge.grandlyon.com/web-et-numerique/web-et-numerique-internet/data.grandlyon.com/web-portal/components/indexers/metadata-and-data:${TAG}
command: python workers/metadata-processor.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue metadata_records_to_process --loglevel DEBUG
command: python workers/metadata_processor.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue metadata_records_to_process --loglevel DEBUG
volumes:
#- ./config.yaml:/app/config.yaml:ro
- working-directory:/app/output
......@@ -31,7 +31,7 @@ services:
doc-enricher:
build: .
image: registry.forge.grandlyon.com/web-et-numerique/web-et-numerique-internet/data.grandlyon.com/web-portal/components/indexers/metadata-and-data:${TAG}
command: python workers/doc-enricher.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue doc_pages_to_enrich --loglevel DEBUG
command: python workers/doc_enricher.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue doc_pages_to_enrich --loglevel DEBUG
# volumes:
# - ./config.yaml:/app/config.yaml:ro
restart: unless-stopped
......@@ -43,7 +43,7 @@ services:
doc-processor:
build: .
image: registry.forge.grandlyon.com/web-et-numerique/web-et-numerique-internet/data.grandlyon.com/web-portal/components/indexers/metadata-and-data:${TAG}
command: python workers/doc-processor.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue doc_pages_to_process --loglevel DEBUG
command: python workers/doc_processor.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue doc_pages_to_process --loglevel DEBUG
volumes:
# - ./config.yaml:/app/config.yaml:ro
- working-directory:/app/output
......@@ -56,7 +56,7 @@ services:
doc-indexer:
build: .
image: registry.forge.grandlyon.com/web-et-numerique/web-et-numerique-internet/data.grandlyon.com/web-portal/components/indexers/metadata-and-data:${TAG}
command: python workers/doc-indexer.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue doc_pages_to_index --loglevel DEBUG
command: python workers/doc_indexer.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue doc_pages_to_index --loglevel DEBUG
# volumes:
# - ./config.yaml:/app/config.yaml:ro
restart: unless-stopped
......@@ -80,7 +80,7 @@ services:
sampler:
build: .
image: registry.forge.grandlyon.com/web-et-numerique/web-et-numerique-internet/data.grandlyon.com/web-portal/components/indexers/metadata-and-data:${TAG}
command: python workers/sample-generator.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue sampling_tasks --loglevel DEBUG
command: python workers/sample_generator.py --host rabbitmq --exchange download_data_grandlyon_com_index --queue sampling_tasks --loglevel DEBUG
volumes:
- working-directory:/app/output
restart: unless-stopped
......
......@@ -34,20 +34,20 @@ template = {
"min_gram": 2,
"max_gram": 30,
"token_chars": [
"letter",
"digit",
"letter",
"digit",
"punctuation",
"symbol"
]
]
},
"my_ngram_tokenizer": {
"type": "ngram",
"min_gram": 2,
"max_gram": 30,
"token_chars": [
"letter",
"digit"
]
"letter",
"digit"
]
}
},
"analyzer": {
......
......@@ -33,7 +33,6 @@ def detect_type( value ):
def convert_type( el ):
#print()
if type(el) is str:
if el.strip() == '' or el.strip() == 'None':
......@@ -41,45 +40,26 @@ def convert_type( el ):
# try parsing as integer
try:
#print('INT?', el)
el_as_int = convert_to_int(el)
#parsed_samples.append(el_as_int)
if str(el_as_int) == el:
#print('yes!')
return el_as_int
# else:
# parsed_samples.append(el)
except:
#print('not an INT')
pass
#return 'unknown'
# try parsing as float
try:
#print('FLOAT?', el)
el_as_float = convert_to_float(el)
#parsed_samples.append(el_as_float)
if str(el_as_float) == el.rstrip('0'):
#print('yes!')
return el_as_float
# else:
# parsed_samples.append(el)
except:
#print('not a FLOAT')
#return 'unknown'
pass
# try parsing as boolean
try:
el_as_boolean = convert_to_boolean(el)
return el_as_boolean
except:
pass
......@@ -88,13 +68,10 @@ def convert_type( el ):
if bool(myregex.match(el)): #TODO:
# try parsing as date
try:
#print('DATE?', el)
el_as_date = convert_to_datetime(el)
#print(el_as_date)
return el_as_date
except:
pass
#print('not a DATE')
return el
......
......@@ -56,6 +56,7 @@ def setup_indices(cfg):
}
try:
es_body['settings']['number_of_replicas'] = cfg['reindexer']['number_of_replicas']
rep = source_es.indices.create(source_index, es_body)#, wait_for_active_shards=0)
except Exception as e:
logging.error(e)
......
"""
Fonctions génériques de pytest
"""
import msgpack
from lib.serializers import encode_datetime, decode_datetime
import re
import os
import pytest
def _sanitize(name):
"""
Replace certain characters (which might be problematic when contained in
strings which will be used as file names) by '-'s.
import from https://github.com/betamaxpy/betamax/blob/master/src/betamax/fixtures/pytest.py
"""
return re.sub(r'[\s/<>:\\"|?*]', '-', name)
@pytest.fixture
def load_object():
def load_file(file_path):
with open(file_path, 'rb') as file_stream:
expected_data = msgpack.unpackb(file_stream.read(), raw=False, object_hook=decode_datetime)
return expected_data
yield load_file
@pytest.fixture
def verify_objects(request):
"""
fixture than compares data to expected data in file
If the file data doesn't exists, it whill be created with current data
"""
module = request.module.__name__
node = _sanitize(request.node.name)
def check_data(data, iteration=None):
if iteration is not None:
file_path = f'tests/data/{module}.{node}.{iteration}.mp'
else:
file_path = f'tests/data/{module}.{node}.mp'
if not os.path.exists(file_path):
with open(file_path, "wb") as file_stream:
the_body = msgpack.packb(data, use_bin_type=True, default=encode_datetime)
file_stream.write(the_body)
with open(file_path, 'rb') as file_stream:
expected_data = msgpack.unpackb(file_stream.read(), raw=False, object_hook=decode_datetime)
assert data == expected_data
yield check_data
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This diff is collapsed.
import pytest
from workers.doc_enricher import get_entries_from_postgis
@pytest.mark.vcr()
def test_get_entries_from_postgis(verify_objects):
link = dict(url='/rdata',
name='apd_apidae.apdevenement_2_0_0')
cfg = dict()
cfg['host'] = "147.135.219.0"
cfg['username'] = "grandlyon"
cfg['password'] = ""
cfg['session'] = dict(working_directory='tests/data/prod_working_directory/')
entries = get_entries_from_postgis(link, cfg)
assert entries
for i, entry in enumerate(entries):
progress, count, serialized_deferred_count, feature_page = entry
verify_objects(feature_page, iteration=i)
from workers.doc_processor import fix_field_types
import os
import json
def test_fix_field_types(load_object, verify_objects):
link = dict()
link['url'] = "/rdata"
link['name'] = "apd_apidae.apdevenement_2_0_0"
filename = os.path.join('tests/data/prod_working_directory/',
'field_catalog_by_dbschematable.json' )
if os.path.isfile(filename):
with open(filename, 'r') as fp:
catalog = json.load(fp)
docs = load_object('tests/data/test_doc_enricher.test_get_entries_from_postgis.0.mp')
doc_page = [{'data-fr': feature} for feature in docs]
out_docs = fix_field_types(doc_page, catalog, link)
verify_objects(out_docs)
......@@ -24,7 +24,6 @@ def elect_field_type( data ):
logging.info("Electing field types...")
fields = data['fields'].keys()
types = {}
......@@ -36,8 +35,6 @@ def elect_field_type( data ):
# intra-dataset election
for db_schema_table, analysis in data['fields'][k].items():
#logging.info('\nAnalyzing table: %s' % (db_schema_table))
found_types = set(analysis['types'].keys())
if found_types == set(['NoneType']):
......@@ -49,11 +46,8 @@ def elect_field_type( data ):
if 'NoneType' in found_types:
found_types.remove('NoneType')
#if not all(x==found_types[0] for x in found_types): # NOT SAME TYPE: WHICH ONE TO CHOOSE?
if len( found_types ) > 1:
logging.warning('Conflicting datatypes for field "%s" within the table %s.' % (k, db_schema_table))
#print('WARNING - MIXED TYPES', parsed_types)
#logging.warning('WARNING - MIXED TYPES %s' % found_types)
if 'str' in found_types:
logging.warning('Found %s => str wins the election!' % found_types)
......@@ -67,18 +61,13 @@ def elect_field_type( data ):
logging.warning('Found %s => str wins the election!' % found_types)
types[k].append('str')
# elif parsed_types == []:
# print('WARNING - NULL VALUES', k, uuid)
# #print('WARNING - NULL VALUES', parsed_types)
# #types[k].append('null') # NULL TYPE
# #types[k].append(type(str())) #TODO
else:
types[k].append(found_types.pop())
# inter-dataset election
for k, v in types.items():
if len( set(v) ) > 1:# and 'null' not in set(v):
if len( set(v) ) > 1:
v = set(filter(None, v))
if len(v) == 1:
......@@ -102,32 +91,21 @@ def elect_field_type( data ):
logging.warning( 'UNKNOWN Type conflict: %s, %s => str wins!' % (k, v) )
types[k] = ['str']
# for element in itertools.product(*[fields,fields]):
# if element[0] != element[1]:
#
# myre = re.compile(r'%s\.[^0-9]+' % element[0])
#
# if myre.findall(element[1]) != []:
# logging.warning("%s, %s" % (element[0], element[1]))
# how to handle unknown values for dates ???
result = {}
for i, (k, v) in enumerate(types.items()):
#print(i, k, list(set(v)))
try:
result[k] = list(set(v))[0]#.__name__
except IndexError:
pass # means we got NULL VALUES
#print(k, v)
logging.info('Elections: done!')
return result
def generate_field_catalog( cfg ) :#, catalog=None ):
def generate_field_catalog( cfg ):
"""
1st output:
----------------------
......@@ -142,13 +120,6 @@ def generate_field_catalog( cfg ) :#, catalog=None ):
| - elected type
"""
# if catalog == None or catalog == dict():
# output = {}
# output['analyzed_docs'] = 0
# output['fields'] = {}
# else:
# output = catalog.copy()
types_by_dbschematable_by_field = {}
types_by_dbschematable_by_field['analyzed_docs'] = 0
types_by_dbschematable_by_field['fields'] = {}
......@@ -178,33 +149,16 @@ def generate_field_catalog( cfg ) :#, catalog=None ):
schema_dot_table_whitelist = None
schema_whitelist = None
#print(whitelist)
# selected_schema = "sit_sitra"
# selected_table = selected_schema + ".sittourisme"
# found = False
logging.info('Getting schemas...')
schema_names = pg.get_schema_names()
logging.info('Done.')
#print(schema_names)
for schema_name in schema_names:
if schema_whitelist is not None:
if schema_name not in schema_whitelist:
logging.debug('Skipping schema %s' % schema_name)
continue
# if schema_name != selected_schema:
# continue
# print(schema_name)
for table in pg.get_tables(schema_name):
# print(schema_name)
# if str(table) != selected_table:
# continue
# else:
# found = True
if schema_dot_table_whitelist is not None:
if str(table) not in schema_dot_table_whitelist:
logging.debug('Skipping table %s' % str(table))
......@@ -256,20 +210,11 @@ def generate_field_catalog( cfg ) :#, catalog=None ):
types_by_dbschematable_by_field['fields'][kk][db_schema_table]['types'][detected_type] += 1
types_by_dbschematable_by_field['analyzed_docs'] += 1
#print(types_by_dbschematable_by_field['analyzed_docs'])
# useful for debugging:
if cfg['field_type_detector']['debug'] and types_by_dbschematable_by_field['analyzed_docs'] > 1000:
return (types_by_dbschematable_by_field, elected_type_by_field_by_dbschematable)
# if found == True:
# break
#
# if found == True:
# break
return (types_by_dbschematable_by_field, elected_type_by_field_by_dbschematable)
......@@ -284,21 +229,10 @@ def main(cfg):
if rewrite == False:
if os.path.isfile(filename1) or os.path.isfile(filename1):
#print("(Some of the) output files are already present, and rewrite is disabled. Exiting!")
raise Exception("(Some of the) output files are already present, and rewrite is disabled!")
# dbnames = cfg['postgis']['databases']
# field_catalog = {}
logging.info('Building catalog...')
# for dbname in dbnames:
# logging.info('Analyzing database %s...' % dbname)
# logging.info('Establishing a database connection...')
# pg_connection = Remote(hostname=cfg['postgis']['host'], dbname=dbname, username=cfg['postgis']['username'], password=cfg['postgis']['password'])
# logging.info('Done.')
#field_catalog = {}
field_catalog_by_field, field_catalog_by_dbschematable = generate_field_catalog( cfg )#, field_catalog )
field_catalog_by_field, field_catalog_by_dbschematable = generate_field_catalog( cfg )
logging.info("Catalog: built. %i docs were analyzed. " % field_catalog_by_field['analyzed_docs'])
# writing results to disk
......@@ -310,13 +244,6 @@ def main(cfg):
elected_field_types = elect_field_type( field_catalog_by_field )
# # writing results to disk
# if not os.path.exists(working_directory):
# os.mkdir(working_directory)
#
# with open(filename1, 'w') as fp:
# json.dump(field_catalog, fp, sort_keys=True)
with open(filename2, 'w') as fp:
json.dump(elected_field_types, fp, sort_keys=True)
......@@ -362,28 +289,3 @@ if __name__ == '__main__':
logging.info('Done!')
except Exception as e:
logging.error(e)
#main(wait=False, rewrite=True)
# while True:
#
# try:
# main(cfg)
# except NotEmptyQueueException as e:
# logging.error(e)
# logging.error("Retrying in 5 seconds...")
# time.sleep(5)
# #finally:
# # exit(0)
# except pika.exceptions.ChannelClosed:
# logging.info("Waiting for RabbitMQ channel to be open...")
# time.sleep(5)
# except pika.exceptions.ConnectionClosed:
# logging.info("Waiting for RabbitMQ to be reachable...")
# time.sleep(5)
# except Exception as e:
# #logging.error('here')
# logging.error(e)
# logging.error('Retrying in 5 seconds...')
# time.sleep(5)
......@@ -54,7 +54,6 @@ def get_entries_from_postgis( link, cfg, no_features_per_page=1000 ):
feature_page = [] # we accumulate entries in this sort of buffer
cnt = 0
for entry in pg.get_entries(table):
# TODO Ajouter le type dans les colonnes par ici
feature_page.append(entry)
if len(feature_page) == no_features_per_page:
......@@ -128,6 +127,7 @@ def enrich_docs( channel, method, properties, body ):
wfs_info = decoded_body['header']['wfs_info']
cfg = decoded_body['header']['cfg']
# initialize RabbitMQ queues
exchange = cfg['rabbitmq']['exchange']
doc_pages_to_process_qn = cfg['rabbitmq']['queue_name_5']
......@@ -184,41 +184,21 @@ def enrich_docs( channel, method, properties, body ):
properties=pika.BasicProperties(delivery_mode = 2)
)
#logging.info('...done!')
# except TypeError: # it means that getWFS returned None
# pass
# #
# #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
channel.basic_ack(delivery_tag = method.delivery_tag)
#channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
return #out_docs
def main(cfg):
#from lib.close_connection import on_timeout
connection = pika.BlockingConnection(pika.ConnectionParameters(host=cfg['rabbitmq_host'], port=cfg['rabbitmq_port']))
#timeout = 5
#connection.add_timeout(timeout, on_timeout(connection))
channel = connection.channel()
# exchange = cfg['rabbitmq_exchange']
# the queue this program will consume messages from:
docs_to_enrich_qn = cfg['rabbitmq_queue']
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_message_callback=lambda ch, method, properties, body:
enrich_docs(ch, method, properties, body),
#doc_pages_to_store_in_mongo_rk=doc_pages_to_store_in_mongo_rk,
#docs_to_enrich_rk=docs_to_enrich_rk,
#doc_pages_to_process_rk=doc_pages_to_process_rk,
#features_per_page=cfg['wfs']['features_per_page'],
#postgis_cfg=cfg['postgis']),
queue=docs_to_enrich_qn)#, no_ack=True)
queue=docs_to_enrich_qn)
channel.start_consuming()
connection.close()
......
......@@ -21,8 +21,12 @@ class FieldTypeNotFound(Exception):
pass
def fix_field_types( in_docs, out_types ):
def fix_field_types( in_docs, field_catalog, link):
re_decimal = re.compile(r'\.\d+')
dbname = link['url'].split('/')[-1]
types = field_catalog[f"{dbname}/{link['name']}"]["types"]
out_docs = []
for in_doc in in_docs:
......@@ -44,47 +48,55 @@ def fix_field_types( in_docs, out_types ):
continue
# LOOKUP, ex.: the type of a field named "thefield.12.thesubfield" can be found in the catalog by looking for "thefield.0.thesubfield"
lookup_key = re.sub(r'\.\d+', '.0', prop)
lookup_key = re_decimal.sub('.0', prop)
if lookup_key not in out_types.keys():
if lookup_key not in types.keys():
raise FieldTypeNotFound(lookup_key)
if out_types[lookup_key] == 'str':
if types[lookup_key] == 'str':
out_flattened_properties[prop] = convert_to_str(in_flattened_properties[prop])
elif out_types[lookup_key] == 'int':
elif types[lookup_key] == 'int':
out_flattened_properties[prop] = convert_to_int(in_flattened_properties[prop])
elif out_types[lookup_key] == 'float':
elif types[lookup_key] == 'float':
out_flattened_properties[prop] = convert_to_float(in_flattened_properties[prop])
elif out_types[lookup_key] in ['date', 'datetime']:
elif types[lookup_key] in ['date', 'datetime']:
#out_flattened_properties[prop] = convert_to_datetime(in_flattened_properties[prop]).strftime('%Y-%m-%dT%H:%M:%SZ')
# NOTE: the following make sure that (non-standard) original date formats are not altered
out_flattened_properties[prop] = convert_to_str(in_flattened_properties[prop])
elif out_types[lookup_key] == 'bool':
elif types[lookup_key] == 'bool':
out_flattened_properties[prop] = convert_to_boolean(in_flattened_properties[prop])
elif not out_types[lookup_key]:
elif not types[lookup_key]:
# If going through this step it means that a value has been found for that particular field
# so the type for that field shouldn't be null, in order to fix that we launch the recreation
# of the catalog
logging.debug('type %s found, recreating fields catalog', out_types[lookup_key])
logging.debug('type %s found, recreating fields catalog', types[lookup_key])
raise FieldTypeNotFound(lookup_key)
else:
logging.critical('type %s not supported', out_types[lookup_key])
logging.critical('type %s not supported', types[lookup_key])
sys.exit(1)
out_doc = in_doc.copy()
out_doc['data-fr']['properties'] = unflatten_json(out_flattened_properties)
def typed_name(name, types):
type_of_name = types.get(name)
if not type_of_name:
return f'{name}_json'
return f'{name}_{type_of_name}'
# Add type to prop name
out_flattened_properties = {typed_name(key, types): value
for key, value in unflatten_json(out_flattened_properties).items()}
out_doc = in_doc.copy()
out_doc['data-fr']['properties'] = out_flattened_properties
# amending addresses which are not in the schema.org format; we use out_doc in order to avoid reintroducing null values!
if 'address' in out_doc