Commit 3e2e8a01 authored by Sébastien DA ROCHA's avatar Sébastien DA ROCHA
Browse files

remove dead code

parent a6d7399d
Pipeline #8193 passed with stage
in 11 seconds
......@@ -20,65 +20,6 @@ class NotEmptyQueueException(Exception):
pass
def old_generate_field_catalog(cfg, mongo_collection):
# with open("config.yaml", 'r') as yamlfile:
# cfg = yaml.load(yamlfile)
# read from MongoDB
docs = mongo_collection.find()
total = mongo_collection.count_documents({})
#cnt = 0
types_by_dbschematable_by_field = {}
types_by_dbschematable_by_field['fields'] = {}
types_by_dbschematable_by_field['analyzed_docs'] = 0
#for doc in tqdm(docs, total=total, file=sys.stdout):
for doc in docs:
properties = doc['data-fr']['properties']
dataset_uuid = doc['metadata-fr']['geonet:info']['uuid']
dataset_title = doc['metadata-fr']['title']
flattened_properties = flatten_json(properties)
#logging.INFO("Analyzing dataset %s %s" % (dataset_uuid, dataset_title))
#logging.debug('there')
# ---------------------------------------------------------------------------------------------
for k, v in flattened_properties.items():
# generate equivalent element kk out of k (in the sense of equivalence classes, cf. https://en.wikipedia.org/wiki/Equivalence_class)
# ex.: openinghoursspecification.XX.validThrough -> openinghoursspecification.0.validThrough
# N.B.: integers like .0, .1, .2, ... are inserted by the flatten_json function whenever a list is found...
kk = re.sub(r'\.\d+', '.0', k)
if kk not in types_by_dbschematable_by_field['fields'].keys():
types_by_dbschematable_by_field['fields'][kk] = {}
if dataset_uuid not in types_by_dbschematable_by_field['fields'][kk].keys():
types_by_dbschematable_by_field['fields'][kk][dataset_uuid] = {'title': dataset_title, 'types': []}
type_set = set( types_by_dbschematable_by_field['fields'][kk][dataset_uuid]['types'] )
type_set.add( detect_type(v) )
types_by_dbschematable_by_field['fields'][kk][dataset_uuid]['types'] = list(type_set)
types_by_dbschematable_by_field['analyzed_docs'] += 1
logging.info( "%s/%s" % (types_by_dbschematable_by_field['analyzed_docs'], total) )
# useful for debugging:
if cfg['field_type_detector']['debug'] and types_by_dbschematable_by_field['analyzed_docs'] > 1000:
break
return types_by_dbschematable_by_field
def elect_field_type( data ):
logging.info("Electing field types...")
......
......@@ -54,6 +54,8 @@ def get_entries_from_postgis( link, cfg, no_features_per_page=1000 ):
feature_page = [] # we accumulate entries in this sort of buffer
cnt = 0
for entry in pg.get_entries(table):
# TODO Ajouter le type dans les colonnes par ici
feature_page.append(entry)
if len(feature_page) == no_features_per_page:
cnt += 1
......@@ -84,13 +86,11 @@ def get_wfs( link, credentials, offset=0, no_features_per_page=1000 ):
#params['startindex'] = 11
params['SRSNAME'] = 'epsg:4326'
#startindex = 0
cnt = offset / no_features_per_page + 1
logging.info('WFS page %i; offset = %i' % (cnt, offset))
params['startindex'] = offset #0 + cnt*no_features_per_page
#params['to'] = params['from'] + no_records_per_page - 1
with_credentials = False
for domain in credentials:
......@@ -109,104 +109,16 @@ def get_wfs( link, credentials, offset=0, no_features_per_page=1000 ):
logging.debug(res.url)
try:
# print(res.status_code)
# print(res.text)
# print(res.json())
features = res.json()['features']
#processed_features = process_features(features)
logging.debug(len(features))
return features
# if len(features) < no_features_per_page:
# break # it means that we have reached the last page
#
# cnt += 1
except: #json.decoder.JSONDecodeError: # it means that we the WFS request failed, for instance because of insufficient right access
#logging.error("Failed WFS request: %s" % res.url)
logging.error("Failed WFS request: %s" % res.url)
#yield None
#raise Exception("Failed WFS request: %s" % res.url)
return None
#print()
def old_enrich_docs( channel, method, properties, body, **kwargs ):
decoded_body = msgpack.unpackb(body, raw=False)
wfs_info = decoded_body['header']['wfs_info']
offset = decoded_body['header']['offset']
session_id = decoded_body['header']['session_id']
dest_index = decoded_body['header']['dest_index']
logging.info('Enriching dataset named: %s' % decoded_body['body']['metadata-fr']['title'])
feature_page = get_wfs(wfs_info, kwargs['credentials'], offset, kwargs['features_per_page'])
# we implement pagination by letting this program creating tasks for itself / its siblings
if feature_page != None and len(feature_page) == kwargs['features_per_page']: # at least another page is needed
msg = {'header': {'wfs_info': wfs_info, 'offset': offset+kwargs['features_per_page'], 'session_id': session_id, 'dest_index': dest_index}, 'body': decoded_body['body']}
the_body = msgpack.packb(msg, use_bin_type=True)
channel.basic_publish( exchange=kwargs['exchange'],
routing_key=kwargs['docs_to_enrich_rk'],
body=the_body,
properties=pika.BasicProperties(delivery_mode = 2)
)
if feature_page != None:
# try:
# #for feature_page in feature_pages:
# #print(feature_page[0]['properties']['nom_reduit'])
logging.info('Sending feature page of len = %i to RabbitMQ and MongoDB...' % len(feature_page))
doc_page = [{**decoded_body['body'], 'data-fr': feature} for feature in feature_page]
msg = {'header': {'metadata': decoded_body['body'], 'session_id': session_id, 'dest_index': dest_index}, 'body': doc_page}
the_body = msgpack.packb(msg, use_bin_type=True)
channel.basic_publish( exchange=kwargs['exchange'],
routing_key=kwargs['doc_pages_to_store_in_mongo_rk'],
body=the_body,
properties=pika.BasicProperties(delivery_mode = 2)
)
channel.basic_publish( exchange=kwargs['exchange'],
routing_key=kwargs['doc_pages_to_process_rk'],
body=the_body,
properties=pika.BasicProperties(delivery_mode = 2)
)
logging.info('...done!')
# except TypeError: # it means that getWFS returned None
# pass
# #
# #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
channel.basic_ack(delivery_tag = method.delivery_tag)
#channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1)
return #out_docs
def enrich_docs( channel, method, properties, body ):
......
......@@ -27,13 +27,10 @@ def fix_field_types( in_docs, out_types ):
for in_doc in in_docs:
# metadata = in_doc['metadata-fr']
# data = in_doc['data-fr']
if 'data-fr' not in in_doc:
out_docs.append(in_doc)
continue
#
in_flattened_properties = flatten_json(in_doc['data-fr']['properties'])
out_flattened_properties = in_flattened_properties.copy()
......@@ -74,10 +71,11 @@ def fix_field_types( in_docs, out_types ):
logging.critical('type %s not supported', out_types[lookup_key])
sys.exit(1)
# pprint
out_doc = in_doc.copy()
out_doc['data-fr']['properties'] = unflatten_json(out_flattened_properties)
# amending addresses which are not in the schema.org format; we use out_doc in order to avoid reintroducing null values!
if 'address' in out_doc['data-fr']['properties'].keys() and type(out_doc['data-fr']['properties']['address']) is str:
the_street_address = in_doc['data-fr']['properties']['address']
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment