diff --git a/0-reset-session.py b/0-reset-session.py index 6c7ad9f3c6b2a9b431357c12d084c70dd482781a..f6032fcfdaefd4009bf5cce0e51f50adc801f9d1 100644 --- a/0-reset-session.py +++ b/0-reset-session.py @@ -38,7 +38,8 @@ if __name__ == '__main__': # delete this session files in the working_directory for filename in glob.glob("%s/%s*" % (cfg['session']['working_directory'], cfg['session']['id'])): - os.remove(filename) + if 'json' not in filename: + os.remove(filename) # delete MongoDB collections/records related to this session diff --git a/0-setup-session.py b/0-setup-session.py new file mode 100644 index 0000000000000000000000000000000000000000..369a8a017948dde5175b477a0bdfa6df5f41f797 --- /dev/null +++ b/0-setup-session.py @@ -0,0 +1,47 @@ +from elasticsearch import Elasticsearch + +if __name__ == '__main__': + + import yaml + + with open("config.yaml", 'r') as yamlfile: + cfg = yaml.load(yamlfile) + + es = Elasticsearch([cfg['indexer']['url']], timeout=60) + es_index = cfg['indexer']['index'] + + es_body = { "settings" : { + "number_of_shards" : 1, + "number_of_replicas" : 0, + "index.mapping.total_fields.limit": 10000, + "refresh_interval": "30s" + }, + "mappings": { + "_doc": { + "dynamic_templates": [ # priority is given by order! + { + "uuid" : { + "path_match": "uuid", + "mapping": { + "type": "keyword", + } + } + }, + { + "default" : { + "path_match": "*", + "mapping": { + "enabled": "false" + } + } + } + ] + } + } + } + #es_body.update({"mappings": {"_doc": {"dynamic_date_formats": ["strict_date_optional_time"]}}}) + + try: + rep = es.indices.create(es_index, es_body)#, wait_for_active_shards=0) + except: + pass diff --git a/1-metadata-getter.py b/1-metadata-getter.py index 3dff117bfbdaa17900ecd89040e63fbc138ba0c7..650e361c8ceed2bfd0fd5bc687b7ae0e9b095233 100644 --- a/1-metadata-getter.py +++ b/1-metadata-getter.py @@ -14,7 +14,7 @@ def filter_function( x, the_uuids_to_filter_out ): return x['geonet:info']['uuid'] not in the_uuids_to_filter_out # GEONETWORK METADATA GETTER -def get_pages( root_url, no_records_per_page, uuid=None, the_filter=None ): +def get_pages( root_url, no_records_per_page, uuid=None, the_filter=None, username=None, password=None ): params = {} @@ -38,13 +38,16 @@ def get_pages( root_url, no_records_per_page, uuid=None, the_filter=None ): logging.debug("Get metadata pages, from record no. %s to record no. %s." % (params['from'],params['to'])) - res = requests.get(root_url, params=params) + if username != None and password != None: + res = requests.get(root_url, params=params, auth=(username, password)) + else: + res = requests.get(root_url, params=params) logging.debug(res.url) try: res.json()['metadata'] except KeyError as e: - raise RecordNotFound('The record with uuid=%s was not found! Are you sure that it actually exists?' % uuid) + raise RecordNotFound('The record with uuid=%s was not found! Are you sure that it actually exists and that you have the proper access rights?' % uuid) if type(res.json()['metadata']) is list: records = res.json()['metadata'] @@ -128,18 +131,18 @@ def main(cfg): uuids_to_get = cfg['metadata_getter']['uuids_to_get'] uuids_to_filter_out = cfg['metadata_getter']['uuids_to_filter_out'] - - #print(uuids_to_get) + username = cfg['geonetwork']['username'] + password = cfg['geonetwork']['password'] if 'all' not in uuids_to_get: for uuid_to_get in uuids_to_get: - for page in get_pages( cfg['geonetwork']['url'], cfg['geonetwork']['records_per_page'], uuid=uuid_to_get, the_filter=uuids_to_filter_out ): + for page in get_pages( cfg['geonetwork']['url'], cfg['geonetwork']['records_per_page'], uuid=uuid_to_get, the_filter=uuids_to_filter_out, username=username, password=password ): send_page(cfg['session']['id'], cfg['geonetwork']['url'], cfg['indexer']['index'], page, channel, exchange, queue_name) else: - for page in get_pages( cfg['geonetwork']['url'], cfg['geonetwork']['records_per_page'], uuid=None, the_filter=uuids_to_filter_out ): + for page in get_pages( cfg['geonetwork']['url'], cfg['geonetwork']['records_per_page'], uuid=None, the_filter=uuids_to_filter_out, username=username, password=password ): send_page(cfg['session']['id'], cfg['geonetwork']['url'], cfg['indexer']['index'], page, channel, exchange, queue_name) diff --git a/2-metadata-processor.py b/2-metadata-processor.py index 3635f19058fd51b2220f6dce8ba49a2ba022b9b9..49a9ad0aa250bba92e8753546c8027e785d41daa 100644 --- a/2-metadata-processor.py +++ b/2-metadata-processor.py @@ -89,13 +89,20 @@ def list_to_dictlist( the_input, the_context=None ): if the_context == 'responsibleParty': try: + # the following applies to legacy metadata parent_organisation, child_organisation = out_item['organisationName'].split('/') parent_organisation = parent_organisation.strip() child_organisation = child_organisation.strip() except: - parent_organisation, child_organisation = out_item['organisationName'], None - parent_organisation = parent_organisation.strip() - child_organisation = None + try: + # the following applies to Dublin Core metadata + my_re = re.compile(r"(?P<organisationName>[^\(\)]+)(\((?P<individualName>.*)\))") + parent_organisation = my_re.match(out_item['organisationName']).groupdict()['organisationName'].strip() + child_organisation = my_re.match(out_item['organisationName']).groupdict()['individualName'].strip() + except: + parent_organisation, child_organisation = out_item['organisationName'], None + parent_organisation = parent_organisation.strip() + child_organisation = None out_item['organisationName'] = parent_organisation @@ -111,18 +118,18 @@ def list_to_dictlist( the_input, the_context=None ): if the_context == 'geoBox': polygon = [] - polygon.append([the_output[0]['westBoundLongitude'], the_output[0]['southBoundLatitude']]) - polygon.append([the_output[0]['eastBoundLongitude'], the_output[0]['southBoundLatitude']]) - polygon.append([the_output[0]['eastBoundLongitude'], the_output[0]['northBoundLatitude']]) - polygon.append([the_output[0]['westBoundLongitude'], the_output[0]['northBoundLatitude']]) - polygon.append([the_output[0]['westBoundLongitude'], the_output[0]['southBoundLatitude']]) + polygon.append([float(the_output[0]['westBoundLongitude']), float(the_output[0]['southBoundLatitude'])]) + polygon.append([float(the_output[0]['eastBoundLongitude']), float(the_output[0]['southBoundLatitude'])]) + polygon.append([float(the_output[0]['eastBoundLongitude']), float(the_output[0]['northBoundLatitude'])]) + polygon.append([float(the_output[0]['westBoundLongitude']), float(the_output[0]['northBoundLatitude'])]) + polygon.append([float(the_output[0]['westBoundLongitude']), float(the_output[0]['southBoundLatitude'])]) the_output = {'type': 'Polygon', 'coordinates': [polygon]} return the_output -def process_records( in_records, geonetwork_root_url, working_directory ): +def process_records( in_records, geonetwork_root_url, working_directory, credentials ): #print( in_records[0].keys() ) @@ -148,7 +155,7 @@ def process_records( in_records, geonetwork_root_url, working_directory ): #exit(1) del out_record['metadata-fr']['link'] tmp = list_to_dictlist(in_record['link'], 'link')#links - out_record['metadata-fr']['link'] = enrich_links( fix_links(tmp), working_directory ) + out_record['metadata-fr']['link'] = enrich_links( fix_links(tmp, credentials), working_directory ) if 'userinfo' in out_record['metadata-fr'].keys(): del out_record['metadata-fr']['userinfo'] @@ -170,6 +177,13 @@ def process_records( in_records, geonetwork_root_url, working_directory ): del out_record['metadata-fr']['legalConstraints'] out_record['metadata-fr']['legalConstraints'] = [in_record['legalConstraints']] + # adding a 'license' field + out_record['metadata-fr']['license'] = 'unknown' + if 'legalConstraints' in out_record['metadata-fr'].keys(): + for el in out_record['metadata-fr']['legalConstraints']: + if "licence" in el.lower(): + out_record['metadata-fr']['license'] = el + if 'resourceConstraints' in out_record['metadata-fr'].keys() and type(out_record['metadata-fr']['resourceConstraints']) is str: del out_record['metadata-fr']['resourceConstraints'] out_record['metadata-fr']['resourceConstraints'] = [in_record['resourceConstraints']] @@ -208,7 +222,10 @@ def process_records( in_records, geonetwork_root_url, working_directory ): # let's delete some attributes which are very specific to GeoNetwork - attribs_to_delete = ['userinfo', 'isHarvested', 'isTemplate', 'owner', 'displayOrder', 'publishedForGroup', 'valid'] + attribs_to_delete = ['userinfo', 'isHarvested', 'isTemplate', 'owner', \ + 'displayOrder', 'publishedForGroup', 'valid', 'docLocale', \ + 'popularity', 'mdLanguage', 'root', 'rating', 'source', \ + 'defaultTitle', 'datasetLang', 'geoDesc', 'locale', 'logo'] for attrib in attribs_to_delete: try: @@ -219,9 +236,9 @@ def process_records( in_records, geonetwork_root_url, working_directory ): if 'idxMsg' in in_record.keys(): del out_record['metadata-fr']['idxMsg'] - # let's delete duplicates in the 'updateFrequency' attribute + # let's take just one value among those that are in the updateFrequency (hoping that it is representative...) if 'updateFrequency' in in_record.keys() and type(in_record['updateFrequency']) is list: - out_record['metadata-fr']['updateFrequency'] = list(set(in_record['updateFrequency'])) + out_record['metadata-fr']['updateFrequency'] = in_record['updateFrequency'][0] # let's generate the href of this document local_params = OrderedDict() @@ -255,7 +272,7 @@ def process_page( channel, method, properties, body, **kwargs): page = decoded_body['body'] - out_records = process_records( page, geonetwork_root_url, kwargs['working_directory'] ) + out_records = process_records( page, geonetwork_root_url, kwargs['working_directory'], kwargs['credentials'] ) #print(json.dumps(out_records[0], indent=4)) #dispatch @@ -271,8 +288,8 @@ def process_page( channel, method, properties, body, **kwargs): if 'link' in metadata_record['metadata-fr'].keys(): for link in metadata_record['metadata-fr']['link']: - - if 'service' in link.keys() and link['service'] == 'WFS': + #TODO: generalize! + if 'service' in link.keys() and link['service'] == 'WFS' and 'data.grandlyon.com' in link['url']: logging.debug('EUREKA : found a WFS ressource!') wfs_found = True #documents_to_index = [] @@ -282,6 +299,7 @@ def process_page( channel, method, properties, body, **kwargs): full_version['uuid'] = metadata_record['metadata-fr']['geonet:info']['uuid'] + '.full' full_version['type'] = metadata_record['metadata-fr']['type'] + msg = {'header': {'wfs_info': link, 'offset': 0, 'session_id': session_id, 'dest_index': dest_index}, 'body': full_version} the_body = msgpack.packb(msg, use_bin_type=True) @@ -338,6 +356,7 @@ def main(cfg): channel.queue_bind(exchange=exchange, queue=docs_to_enrich_qn, routing_key=docs_to_enrich_rk) working_directory = cfg['session']['working_directory'] + credentials = cfg['credentials'] #logging.info('Waiting for messages...') @@ -347,7 +366,8 @@ def main(cfg): exchange=exchange, docs_to_index_rk=docs_to_index_rk, docs_to_enrich_rk=docs_to_enrich_rk, - working_directory=working_directory), + working_directory=working_directory, + credentials=credentials), queue=metadata_pages_to_process_qn)#, no_ack=True) channel.start_consuming() diff --git a/3-doc-enricher.py b/3-doc-enricher.py index affbc71060c8e4ed7158c8c180ffebc5497980cb..f2a9194efdcf47c1360cf17311d4f76e0ec2a763 100644 --- a/3-doc-enricher.py +++ b/3-doc-enricher.py @@ -2,13 +2,56 @@ import pika import msgpack import requests import json +import datetime +#from decimal import Decimal from utils.exit_gracefully import exit_gracefully from utils.my_logging import logging +from utils.postgis_helper import Remote +from utils.serializers import encode_datetime +from sqlalchemy.exc import NoSuchTableError -def get_wfs( link, offset=0, no_features_per_page=1000 ): +def get_entries_from_postgis( link, cfg, no_features_per_page=1000 ): + + dbname = link['url'].split('/')[-1] + schema, table_name = link['name'].split('.') + + logging.info('Getting data from database %s...' % dbname) + logging.info('Establishing a database connection...') + pg = Remote(hostname=cfg['host'], dbname=dbname, username=cfg['username'], password=cfg['password']) + logging.info('Done.') + + try: + table = pg.get_table(table_name, schema=schema) + except NoSuchTableError: + logging.debug('Table %s in schema % s not found :-(' % (table_name, schema)) + return + + count = pg.count_entries(table) + + no_pages = count//no_features_per_page+1 + logging.info('Getting %i entries in %i pages from table %s.%s...'% (count, no_pages, schema, table)) + + feature_page = [] # we accumulate entries in this sort of buffer + cnt = 0 + for entry in pg.get_entries(table): + feature_page.append(entry) + if len(feature_page) == no_features_per_page: + cnt += 1 + print('YIELDING PAGE %i/%i' % (cnt, no_pages)) + yield feature_page + feature_page = [] + + print('YIELDING LAST PAGE', len(feature_page)) + yield feature_page # this will be the last feature_page + + return + + +def get_wfs( link, credentials, offset=0, no_features_per_page=1000 ): root_url = link['url'] + print(offset, no_features_per_page) params = {} params['version'] = '2.0.0' @@ -29,7 +72,19 @@ def get_wfs( link, offset=0, no_features_per_page=1000 ): params['startindex'] = offset #0 + cnt*no_features_per_page #params['to'] = params['from'] + no_records_per_page - 1 - res = requests.get(root_url, params = params) + with_credentials = False + for domain in credentials: + if domain in link['url'] and credentials[domain]['username'] != None and credentials[domain]['password']: + logging.info('Found a valid credential.') + with_credentials = True + username = credentials[domain]['username'] + password = credentials[domain]['password'] + break + + if with_credentials: + res = requests.get(root_url, params = params, auth=(username, password)) + else: + res = requests.get(root_url, params = params) logging.debug(res.url) @@ -66,7 +121,7 @@ def get_wfs( link, offset=0, no_features_per_page=1000 ): -def enrich_docs( channel, method, properties, body, **kwargs ): +def old_enrich_docs( channel, method, properties, body, **kwargs ): decoded_body = msgpack.unpackb(body, raw=False) @@ -79,7 +134,7 @@ def enrich_docs( channel, method, properties, body, **kwargs ): logging.info('Enriching dataset named: %s' % decoded_body['body']['metadata-fr']['title']) - feature_page = get_wfs(wfs_info, offset, kwargs['features_per_page']) + feature_page = get_wfs(wfs_info, kwargs['credentials'], offset, kwargs['features_per_page']) # we implement pagination by letting this program creating tasks for itself / its siblings if feature_page != None and len(feature_page) == kwargs['features_per_page']: # at least another page is needed @@ -99,7 +154,6 @@ def enrich_docs( channel, method, properties, body, **kwargs ): # #for feature_page in feature_pages: # #print(feature_page[0]['properties']['nom_reduit']) logging.info('Sending feature page of len = %i to RabbitMQ and MongoDB...' % len(feature_page)) - doc_page = [{**decoded_body['body'], 'data-fr': feature} for feature in feature_page] msg = {'header': {'metadata': decoded_body['body'], 'session_id': session_id, 'dest_index': dest_index}, 'body': doc_page} @@ -134,6 +188,73 @@ def enrich_docs( channel, method, properties, body, **kwargs ): return #out_docs +def enrich_docs( channel, method, properties, body, **kwargs ): + + + + + decoded_body = msgpack.unpackb(body, raw=False) + + wfs_info = decoded_body['header']['wfs_info'] + offset = decoded_body['header']['offset'] + session_id = decoded_body['header']['session_id'] + dest_index = decoded_body['header']['dest_index'] + + + logging.info('Enriching dataset named: %s' % decoded_body['body']['metadata-fr']['title']) + + #feature_page = get_entries_from_postgis(wfs_info, kwargs['postgis_cfg']) + + #if feature_page != None: + #print('here') + for feature_page in get_entries_from_postgis(wfs_info, kwargs['postgis_cfg']): + + if len(feature_page) == 0: + logging.debug('Empty page!') + continue + #print('here') + logging.info('Sending feature page of len = %i to RabbitMQ...' % len(feature_page)) + # for el in feature_page: + # for key, v in el.items(): + # print(key, v) + #print("LEN", len(feature_page)) + #continue + + doc_page = [{**decoded_body['body'], 'data-fr': feature} for feature in feature_page] + + msg = {'header': {'metadata': decoded_body['body'], 'session_id': session_id, 'dest_index': dest_index}, 'body': doc_page} + #print('here') + the_body = msgpack.packb(msg, use_bin_type=True, default=encode_datetime) + #print('there') + + # channel.basic_publish( exchange=kwargs['exchange'], + # routing_key=kwargs['doc_pages_to_store_in_mongo_rk'], + # body=the_body, + # properties=pika.BasicProperties(delivery_mode = 2) + # ) + + + channel.basic_publish( exchange=kwargs['exchange'], + routing_key=kwargs['doc_pages_to_process_rk'], + body=the_body, + properties=pika.BasicProperties(delivery_mode = 2) + ) + + logging.info('...done!') + + # except TypeError: # it means that getWFS returned None + # pass + # # + # #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + + + + channel.basic_ack(delivery_tag = method.delivery_tag) + #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + + return #out_docs + + def main(cfg): from utils.close_connection import on_timeout @@ -172,7 +293,8 @@ def main(cfg): doc_pages_to_store_in_mongo_rk=doc_pages_to_store_in_mongo_rk, docs_to_enrich_rk=docs_to_enrich_rk, doc_pages_to_process_rk=doc_pages_to_process_rk, - features_per_page=cfg['wfs']['features_per_page']), + features_per_page=cfg['wfs']['features_per_page'], + postgis_cfg=cfg['postgis']), queue=docs_to_enrich_qn)#, no_ack=True) channel.start_consuming() connection.close() diff --git a/5-pg-field-type-detector.py b/5-pg-field-type-detector.py index f3c85c6ce2344884cfb5c202380b584204ce4463..10c5927a695cd0284b5a9c392c0d9a4946330949 100644 --- a/5-pg-field-type-detector.py +++ b/5-pg-field-type-detector.py @@ -1,27 +1,12 @@ import sys import json import re -import pika -import itertools import time -from sqlalchemy import create_engine -from sqlalchemy.engine import reflection -from sqlalchemy import MetaData -from sqlalchemy import select -from sqlalchemy import Table - -# Then load the Geometry type -from geoalchemy2 import Geometry # noqa - - from utils.flatten_utils import flatten_json, unflatten_json from utils.type_utils import detect_type, detect_type_new -#from pymongo import MongoClient from utils.exit_gracefully import exit_gracefully from utils.my_logging import logging -#from tqdm import tqdm - from utils.postgis_helper import Remote @@ -182,7 +167,7 @@ def elect_field_type( data ): return result -def generate_field_catalog( cfg, pg, catalog=None ): +def generate_field_catalog( cfg, catalog=None ): if catalog == None or catalog == dict(): output = {} @@ -191,86 +176,120 @@ def generate_field_catalog( cfg, pg, catalog=None ): else: output = catalog.copy() - # selected_schema = "rdata" - # selected_table = selected_schema + ".sit_sitra.sittourisme" - # found = False - logging.info('Getting schemas...') - schema_names = pg.get_schema_names() - logging.info('Done.') + dbnames = cfg['postgis']['databases'].keys() t1 = time.time() - #print(schema_names) - - for schema_name in schema_names: - # if schema_name != selected_schema: - # continue - # print(schema_name) - for table in pg.get_tables(schema_name): - # if str(table) != selected_table: - # continue - # else: - # found = True - try: - count = pg.count_entries(table) - except: - count = 'unknown no. of' - #print(count) - - db_schema_table = '%s/%s' % (pg.dbname, table) - t2 = time.time() - logging.info('Analyzing table %s (%s records)...' % (db_schema_table, count)) - logging.info('| %i records were analyzed so far (%.2f records/s)' % (output['analyzed_docs'], output['analyzed_docs']/(t2-t1))) - # print(table) - # columns, geom = pg.get_columns(table) - # fields = [table.c[col.name] for col in columns] - # selected = select(fields) - # #detected = {} - # for entry in pg.engine.execute(selected): - # print(entry) - # - # exit(0) - - for doc in pg.get_entries(table): - - properties = doc['properties'] - flattened_properties = flatten_json(properties) - - # --------------------------------------------------------------------------------------------- - for k, v in flattened_properties.items(): - - # generate equivalent element kk out of k (in the sense of equivalence classes, cf. https://en.wikipedia.org/wiki/Equivalence_class) - # ex.: openinghoursspecification.XX.validThrough -> openinghoursspecification.0.validThrough - # N.B.: integers like .0, .1, .2, ... are inserted by the flatten_json function whenever a list is found... - kk = re.sub(r'\.\d+', '.0', k) + for dbname in dbnames: + logging.info('Analyzing database %s...' % dbname) + logging.info('Establishing a database connection...') + pg = Remote(hostname=cfg['postgis']['host'], dbname=dbname, username=cfg['postgis']['username'], password=cfg['postgis']['password']) + logging.info('Done.') + #field_catalog = generate_field_catalog( cfg, pg_connection, field_catalog ) - if kk not in output['fields'].keys(): - output['fields'][kk] = {} + if isinstance(cfg['postgis']['databases'][dbname], dict) and 'whitelist' in cfg['postgis']['databases'][dbname].keys(): + schema_dot_table_whitelist = cfg['postgis']['databases'][dbname]['whitelist'] + print(schema_dot_table_whitelist) + schema_whitelist = [ x.split('.')[0] for x in schema_dot_table_whitelist ] + print(schema_whitelist) - if db_schema_table not in output['fields'][kk].keys(): - output['fields'][kk][db_schema_table] = {'types': dict()} #'title': dataset_title, 'types': []} + else: + schema_dot_table_whitelist = None + schema_whitelist = None - detected_type = detect_type(v) + #print(whitelist) - if detected_type not in output['fields'][kk][db_schema_table]['types'].keys(): - #print('was here, too') - output['fields'][kk][db_schema_table]['types'][detected_type] = 0 + # selected_schema = "sit_sitra" + # selected_table = selected_schema + ".sittourisme" + # found = False - output['fields'][kk][db_schema_table]['types'][detected_type] += 1 + logging.info('Getting schemas...') + schema_names = pg.get_schema_names() + logging.info('Done.') - output['analyzed_docs'] += 1 - #print(output['analyzed_docs']) - # useful for debugging: - if cfg['field_type_detector']['debug'] and output['analyzed_docs'] > 1000: - return output + #print(schema_names) - # if found == True: - # break - # - # if found == True: - # break + for schema_name in schema_names: + if schema_whitelist is not None: + if schema_name not in schema_whitelist: + logging.debug('Skipping schema %s' % schema_name) + continue + # if schema_name != selected_schema: + # continue + # print(schema_name) + for table in pg.get_tables(schema_name): + # print(schema_name) + # if str(table) != selected_table: + # continue + # else: + # found = True + if schema_dot_table_whitelist is not None: + if str(table) not in schema_dot_table_whitelist: + logging.debug('Skipping table %s' % str(table)) + continue + + try: + count = pg.count_entries(table) + except: + count = 'unknown no. of' + #print(count) + + db_schema_table = '%s/%s' % (pg.dbname, table) + t2 = time.time() + logging.info('Analyzing table %s (%s records)...' % (db_schema_table, count)) + logging.info('| %i records were analyzed so far (%.2f records/s)' % (output['analyzed_docs'], output['analyzed_docs']/(t2-t1))) + # print(table) + # columns, geom = pg.get_columns(table) + # fields = [table.c[col.name] for col in columns] + # selected = select(fields) + # #detected = {} + # for entry in pg.engine.execute(selected): + # print(entry) + # + # exit(0) + + for doc in pg.get_entries(table): + + properties = doc['properties'] + + flattened_properties = flatten_json(properties) + + # --------------------------------------------------------------------------------------------- + for k, v in flattened_properties.items(): + + # generate equivalent element kk out of k (in the sense of equivalence classes, cf. https://en.wikipedia.org/wiki/Equivalence_class) + # ex.: openinghoursspecification.XX.validThrough -> openinghoursspecification.0.validThrough + # N.B.: integers like .0, .1, .2, ... are inserted by the flatten_json function whenever a list is found... + kk = re.sub(r'\.\d+', '.0', k) + + if kk not in output['fields'].keys(): + output['fields'][kk] = {} + + if db_schema_table not in output['fields'][kk].keys(): + output['fields'][kk][db_schema_table] = {'types': dict()} #'title': dataset_title, 'types': []} + + detected_type = detect_type(v) + + if detected_type not in output['fields'][kk][db_schema_table]['types'].keys(): + #print('was here, too') + output['fields'][kk][db_schema_table]['types'][detected_type] = 0 + + output['fields'][kk][db_schema_table]['types'][detected_type] += 1 + + output['analyzed_docs'] += 1 + #print(output['analyzed_docs']) + + # useful for debugging: + if cfg['field_type_detector']['debug'] and output['analyzed_docs'] > 1000: + return output + + # if found == True: + # break + # + # if found == True: + # break return output @@ -290,16 +309,17 @@ def main(cfg): raise Exception("(Some of the) output files are already present, and rewrite is disabled!") - dbnames = cfg['postgis']['databases'] - field_catalog = {} + # dbnames = cfg['postgis']['databases'] + # field_catalog = {} logging.info('Building catalog...') - for dbname in dbnames: - logging.info('Analyzing database %s...' % dbname) - logging.info('Establishing a database connection...') - pg_connection = Remote(hostname=cfg['postgis']['host'], dbname=dbname, username=cfg['postgis']['username'], password=cfg['postgis']['password']) - logging.info('Done.') - field_catalog = generate_field_catalog( cfg, pg_connection, field_catalog ) + # for dbname in dbnames: + # logging.info('Analyzing database %s...' % dbname) + # logging.info('Establishing a database connection...') + # pg_connection = Remote(hostname=cfg['postgis']['host'], dbname=dbname, username=cfg['postgis']['username'], password=cfg['postgis']['password']) + # logging.info('Done.') + #field_catalog = {} + field_catalog = generate_field_catalog( cfg )#, field_catalog ) logging.info("Catalog: built. %i docs were analyzed. " % field_catalog['analyzed_docs']) # writing results to disk diff --git a/6-doc-processor.py b/6-doc-processor.py index 84097537077668a3c4699e44e7e33f1b0683fe68..38d1b2d3240ec57e986a42bd06103d78198dc41c 100644 --- a/6-doc-processor.py +++ b/6-doc-processor.py @@ -2,14 +2,17 @@ import pika import msgpack #from pprint import pprint #from dateutil.parser import parse +import datetime import re import json +import sys from utils.flatten_utils import flatten_json, unflatten_json #import pytz #import sys -from utils.type_utils import convert_to_datetime, convert_to_float, convert_to_int, convert_to_str +from utils.type_utils import convert_to_datetime, convert_to_float, convert_to_int, convert_to_str, convert_to_boolean from utils.exit_gracefully import exit_gracefully from utils.my_logging import logging +from utils.serializers import decode_datetime @@ -45,7 +48,7 @@ def fix_field_types( in_docs, out_types ): elif out_types[lookup_key] == 'float': out_flattened_properties[prop] = convert_to_float(in_flattened_properties[prop]) elif out_types[lookup_key] in ['date', 'datetime']: - out_flattened_properties[prop] = convert_to_datetime(in_flattened_properties[prop]).strftime('%Y-%m-%dT%H:%M:%SZ') + out_flattened_properties[prop] = convert_to_datetime(in_flattened_properties[prop]).strftime('%Y-%m-%dT%H:%M:%S.%fZ') elif out_types[lookup_key] == 'bool': out_flattened_properties[prop] = convert_to_boolean(in_flattened_properties[prop]) else: @@ -70,7 +73,10 @@ def fix_field_types( in_docs, out_types ): def process_docs( channel, method, properties, body, **kwargs ): - decoded_body = msgpack.unpackb(body, raw=False) + + decoded_body = msgpack.unpackb(body, raw=False, object_hook=decode_datetime) + #print(decoded_body) + #exit(0) the_dest_index = decoded_body['header']['dest_index'] docs = decoded_body['body'] diff --git a/7-doc-indexer.py b/7-doc-indexer.py index 6285613b139292f1a352c83f2c305f69d1053676..853eb6f680aef5ea5c7149c14fade643ba023605 100644 --- a/7-doc-indexer.py +++ b/7-doc-indexer.py @@ -20,7 +20,7 @@ def tag_doc( the_doc ): # tag_dict[tag] = False # isOpen? - if not 'legalConstraints' in the_doc['metadata-fr'].keys() or not any( [x in the_doc['metadata-fr']['legalConstraints'] for x in ['Licence Associée', 'Licence Engagée'] ] ): + if 'license' in the_doc['metadata-fr'].keys() and not any( [x in the_doc['metadata-fr']['license'] for x in ['Licence Associée', 'Licence Engagée'] ] ): tag_dict['isOpenAccess'] = True else: tag_dict['isOpenAccess'] = False @@ -49,24 +49,29 @@ def tag_doc( the_doc ): if 'data-fr' in the_doc.keys(): tag_dict['isSearchable'] = True - # init - tag_dict['isPunctual'] = False - tag_dict['isLinear'] = False - tag_dict['isAreal'] = False + if 'geometry' in the_doc['data-fr'].keys(): - # isPunctual? - if any( [x in the_doc['data-fr']['geometry']['type'] for x in ['Point', 'MultiPoint']] ): - tag_dict['isPunctual'] = True + # init + tag_dict['isPunctual'] = False + tag_dict['isLinear'] = False + tag_dict['isAreal'] = False - # isLinear? - if any( [x in the_doc['data-fr']['geometry']['type'] for x in ['LineString', 'MultiLineString']] ): - tag_dict['isLinear'] = True + # isPunctual? + if any( [x in the_doc['data-fr']['geometry']['type'] for x in ['Point', 'MultiPoint']] ): + tag_dict['isPunctual'] = True - # isAreal? - if any( [x in the_doc['data-fr']['geometry']['type'] for x in ['Polygon', 'MultiPolygon']] ): - tag_dict['isAreal'] = True + # isLinear? + if any( [x in the_doc['data-fr']['geometry']['type'] for x in ['LineString', 'MultiLineString']] ): + tag_dict['isLinear'] = True - tagged_doc = {'editorial-metadata-en': tag_dict, **the_doc} + # isAreal? + if any( [x in the_doc['data-fr']['geometry']['type'] for x in ['Polygon', 'MultiPolygon']] ): + tag_dict['isAreal'] = True + + # isSample? docs that are tagged by this script are never just a sample + tag_dict['isSample'] = False + + tagged_doc = {'editorial-metadata': tag_dict, **the_doc} return tagged_doc @@ -87,42 +92,42 @@ def index_docs(channel, method, properties, body, es): #print(docs_to_index) - es_index = decoded_body['header']['index']['_index'] - es_body = { "settings" : { - "number_of_shards" : 1, - "number_of_replicas" : 0, - "index.mapping.total_fields.limit": 10000, - "refresh_interval": "30s" - }, - "mappings": { - "_doc": { - "dynamic_templates": [ # priority is given by order! - { - "uuid" : { - "path_match": "uuid", - "mapping": { - "type": "keyword", - } - } - }, - { - "default" : { - "path_match": "*", - "mapping": { - "enabled": "false" - } - } - } - ] - } - } - } - #es_body.update({"mappings": {"_doc": {"dynamic_date_formats": ["strict_date_optional_time"]}}}) - - try: - rep = es.indices.create(es_index, es_body)#, wait_for_active_shards=0) - except: - pass + # es_index = decoded_body['header']['index']['_index'] + # es_body = { "settings" : { + # "number_of_shards" : 1, + # "number_of_replicas" : 0, + # "index.mapping.total_fields.limit": 10000, + # "refresh_interval": "30s" + # }, + # "mappings": { + # "_doc": { + # "dynamic_templates": [ # priority is given by order! + # { + # "uuid" : { + # "path_match": "uuid", + # "mapping": { + # "type": "keyword", + # } + # } + # }, + # { + # "default" : { + # "path_match": "*", + # "mapping": { + # "enabled": "false" + # } + # } + # } + # ] + # } + # } + # } + # #es_body.update({"mappings": {"_doc": {"dynamic_date_formats": ["strict_date_optional_time"]}}}) + # + # try: + # rep = es.indices.create(es_index, es_body)#, wait_for_active_shards=0) + # except: + # pass logging.info("Pushing %i documents to Elasticsearch..." % len(docs_to_index)) diff --git a/8-reindexer.py b/8-reindexer.py index d4d030065981dd6a5f9c7c0f6a325d693877e16e..5c5388c7e2c647ab88a0c6500127af72c83b263f 100644 --- a/8-reindexer.py +++ b/8-reindexer.py @@ -13,7 +13,9 @@ def main(cfg, wait=True): from es_template import template template['index_patterns'] = [ cfg['reindexer']['destination_index'] ] - + template['settings']['number_of_shards'] = cfg['reindexer']['number_of_shards'] + template['settings']['number_of_replicas'] = cfg['reindexer']['number_of_replicas'] + if wait: if 'source_url' in cfg['reindexer'].keys(): @@ -49,7 +51,7 @@ def main(cfg, wait=True): #"remote": { "host": cfg['reindexer']['source_url'] }, "index": cfg['reindexer']['source_index'], "type": "_doc", - "size": 100 + "size": 1000 }, "dest": { "index": cfg['reindexer']['destination_index'], @@ -58,10 +60,10 @@ def main(cfg, wait=True): } if 'source_url' in cfg['reindexer'].keys(): - body['source']['remote'] = cfg['reindexer']['source_url'] + body['source']['remote'] = {'host': cfg['reindexer']['source_url']} logging.debug(body) - + rep = es.reindex(body, wait_for_completion=False) #, slices=24)#, timeout='120s')# timeout='120s', slices=1000, requests_per_second=-1)#, timeout='2m') logging.info(rep) diff --git a/README.md b/README.md index 11d31ba87e0dd477869e9927adb5e70de8b202a1..2542e58a6eb64d7327870270d64027d8bbc2687a 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ N.B.: Steps 6-12 can also be performed at the same time, in separate terminals. ## TODO -* implementing the authenticated access to (meta)data sources; extracting a small sample of restricted access datasets out of the "full" documents, to be used as a teaser for the not-yet-authorized user +* extracting a small sample of restricted access datasets out of the "full" documents, to be used as a teaser for the not-yet-authorized user * incremental updates * the field type detection takes a lot of time: can it be optimized? * logging, reporting diff --git a/config.template.yaml b/config.template.yaml index 725694055c64f7c3c0118d93d8827ff9c97d47ff..781ccd783542a77a5cdd0cdf4ff5255b90ba8c2d 100644 --- a/config.template.yaml +++ b/config.template.yaml @@ -40,9 +40,13 @@ postgis: password: <the_password> host: <the_hostname> databases: - - <1st database to analyze> - - <2nd database to analyze> - - <...> + <1st database to analyze>: + <2nd database to analyze>: + whitelist: + - <1st schema.table to analyze> + - <2nd schema.table to analyze> + - <...> + <...>: metadata_getter: uuids_to_get: @@ -71,3 +75,5 @@ reindexer: destination_index: <ex.: download.data.grandlyon.com.v2> template_name: <ex.: download.data.grandlyon.com.v2> template_index_pattern: <ex.: download.data.grandlyon.com.v2> + number_of_shards: <the_number_of_shards_of_the_destination_index> + number_of_replicas: <the_number_of_replicas_of_the_destination_index> diff --git a/es_template.py b/es_template.py index fb5706fb2755a5509b27afad82106e8bd95de3fb..aed13472ca03049dbd9bc7e3ae9a839a6a6bd9d5 100644 --- a/es_template.py +++ b/es_template.py @@ -4,8 +4,9 @@ template = { "settings" : { "index.mapping.total_fields.limit": 10000, #"index.mapping.ignore_malformed": True, - "number_of_shards" : 1, - "number_of_replicas" : 0, + # "number_of_shards" : 48, + # "number_of_replicas" : 0, + "refresh_interval" : -1, "max_ngram_diff": 100, "analysis": { "filter": { @@ -112,12 +113,28 @@ template = { } } }, + # { + # "link-template" : { + # "path_match": "metadata-fr.link", + # "mapping": { + # #"type": "nested", + # "index": "false" + # #"ignore_malformed": True + # } + # } + # }, { - "link-template" : { - "path_match": "metadata-fr.link", + "keyword-template" : { + "match_pattern": "regex", + "path_match": ".*md5.*|metadata-fr\.link\.formats.*|metadata-fr\.link\.service.*|metadata-fr\.parentId.*", "mapping": { - #"type": "nested", - "index": "false" + "type": "text", + "index": False, + "fields": { + "keyword": { + "type": "keyword" + } + } #"ignore_malformed": True } } @@ -152,14 +169,28 @@ template = { } }, { - "unindexed-path-template": { + "unindexed-path-template-1": { "match_pattern": "regex", "match_mapping_type": "*", - "path_match": "metadata-fr\.href.*|metadata-fr\.idxMsg.*|data-fr\.geometry\..*|metadata-fr\.identifier.*|metadata-fr\.geonet\:info\.@xmlns:geonet|metadata-fr\.responsibleParty\.logo|metadata-fr\.image\..*|.*url|metadata-fr\.link\.name", + "path_match": "metadata-fr\.href.*|metadata-fr\.idxMsg.*|data-fr\.geometry\..*|metadata-fr\.identifier.*|metadata-fr\.geonet\:info.*:geonet|metadata-fr\.responsibleParty\.logo|metadata-fr\.image\..*|.*url|metadata-fr\.link\.name", # "match": "(metadata-fr\.image.*|data-fr\.geometry.*|metadata-fr\.href.*|metadata-fr\.idxMsg.*)", "mapping": { "type": "text", - #"ignore_malformed": True + #"ignore_malformed": True, + "index": False + } + } + }, + { + "unindexed-path-template-2": { + "match_pattern": "regex", + "match_mapping_type": "*", + "path_match": "metadata-fr\.link\.projections|metadata-fr\.link\.content-type|metadata-fr\.link\.content-length", + # "match": "(metadata-fr\.image.*|data-fr\.geometry.*|metadata-fr\.href.*|metadata-fr\.idxMsg.*)", + # TODO: ignore this one -> metadata-fr\.link\.bbox_by_projection + "mapping": { + "type": "text", + #"ignore_malformed": True, "index": False } } diff --git a/requirements.txt b/requirements.txt index 8f9fe8c9003666d2b40f086473fc0c59f1783f1b..b7eae43e7a682f97eeccc121619bf53964968aa2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,6 +8,7 @@ pymongo #tqdm # Elasticsearch 6.x elasticsearch>=6.0.0,<7.0.0 -GeoAlchemy2>=0.5.0 +# N.B.: GeoAlchemy2 0.6.1 does not accept POINTZ, MULTILINESTRINGZ, ... geometries +GeoAlchemy2==0.5.0 psycopg2-binary>=2.7.0 sqlalchemy>=1.2.0,<1.3.0 diff --git a/utils/close_connection.py b/utils/close_connection.py index bbb62bdf83581dfbdffea08b41ffe6969f7fe29e..3a6c0925a9efc2385f36a57d92fc8853d2b59ba0 100644 --- a/utils/close_connection.py +++ b/utils/close_connection.py @@ -1,4 +1,4 @@ -#import sys +import sys from .my_logging import logging def on_timeout(the_connection): @@ -8,6 +8,7 @@ def on_timeout(the_connection): # global connection logging.info('Waiting for messages...') the_connection.close() + #logging.info('Exiting.') #sys.exit(0) #return diff --git a/utils/fix_links.py b/utils/fix_links.py index 2261dc99fa0b3ff6d28e15bb1d2c1523604d0548..0b8160f65180d840bb624b25705ce3c4e2c7fd2d 100644 --- a/utils/fix_links.py +++ b/utils/fix_links.py @@ -2,6 +2,8 @@ import re from pprint import pprint import json import requests +import urllib.request +import base64 from .my_logging import logging #import logging @@ -11,7 +13,7 @@ def translate_content_type( content_type ): output = content_type # TODO: complete the following list! - types = ['pdf', 'html', 'zip', 'xml', 'javascript', 'json', 'csv'] + types = ['pdf', 'html', 'zip', 'xml', 'javascript', 'json', 'csv', 'tiff'] for the_type in types: if the_type in content_type: @@ -51,6 +53,12 @@ def protocol_to_formats_and_services( links ): elif link['protocol'] == 'SOS': output[k]['formats'] = ['JSON', 'XML'] output[k]['service'] = 'SOS' + elif link['protocol'] == 'HTML': + # in order to prevent HTML ressources to be deemed as downloadable + pass + elif link['protocol'].startswith("WWW:"): + # in order to prevent HTML ressources to be deemed as downloadable + pass else: output[k]['formats'] = [ link['protocol'] ] @@ -59,28 +67,58 @@ def protocol_to_formats_and_services( links ): return output -def fix_links( links ): +def fix_links( links, credentials=None ): fixed_links = links.copy() # the 'protocol' attribute is used, today in a rather meaningless way; let's try improving it... (WWW-LINK -> PDF, ...) for k, link in enumerate(links): + if 'unknown' in link.keys(): + del fixed_links[k]['unknown'] + # the 'description' attribute ends, sometimes, with (OGC:WMS), (OGC:WCS), ..., which is redundant and, in some case, erroneus. if 'description' in link.keys(): fixed_links[k]['description'] = re.sub(r'\(OGC:WMS\)|\(OGC:WCS\)|\(OGC:WFS\)|\(OGC:SOS\)', '', link['description']).strip() - # # FIX links pointing to external websites - # #logging.debug(link) - # if (link['protocol'] == 'WWW-LINK' and link['content-type'] == 'WWW:LINK'): - # fixed_links[k]['protocol'] = 'HTML' + # KML + if 'protocol' in link.keys() and link['protocol'] == "application/vnd.google-earth.kml+xml": + fixed_links[k]['protocol'] = 'KML' + continue + + # SOS + if 'protocol' in link.keys() and '/sos/' in link['url'].lower(): + fixed_links[k]['protocol'] = 'SOS' + continue # FIX links in which the declared protocol is as bizarre as "WWW:LINK-1.0-http--link" # The KML protocol needs also to be fixed. if 'protocol' in link.keys() and any( [x in link['protocol'] for x in ['WWW', 'kml', 'html', 'null'] ] ): + #print() + #print(link['url']) try: # let's try getting the information from the Web Server... - resp = requests.head( link['url'], allow_redirects=True ) + with_credentials = False + for domain in credentials: + if domain in link['url'] and credentials[domain]['username'] != None and credentials[domain]['password']: + logging.info('Found a valid credential.') + with_credentials = True + username = credentials[domain]['username'] + password = credentials[domain]['password'] + break + + # N.B.: + # when used with Basic Auth, the requests library strips the Content-Length header from the response; + # that's why we use urllib.request here... + req = urllib.request.Request( link['url'], method="HEAD", headers={'User-Agent': 'Mozilla/5.0'}) + + if with_credentials: + base64string = base64.b64encode(("%s:%s" % (username, password)).encode('ascii')) + req.add_header("Authorization", "Basic %s" % base64string.decode('ascii')) + + resp = urllib.request.urlopen(req) + resp.close() + # the presence of the content-length assures that the Web Server knows what it is talking about, # that is why we include the following line in this try-except block, except for HTML pages, in which case # webservers do not send back the content-length @@ -93,11 +131,12 @@ def fix_links( links ): except Exception as e: logging.debug(e) + # ...otherwise, we make a guess on the basis of the information carried by the URL - known_formats = ['ecw', 'pdf', 'zip', 'kml', 'json', 'tif', 'tiff', 'csv', 'sos'] + known_formats = ['ecw', 'pdf', 'zip', 'json', 'tif', 'tiff', 'csv'] for known_format in known_formats: - if known_format in link['url'].lower(): + if link['url'].lower().endswith(known_format): fixed_links[k]['protocol'] = known_format.upper() continue @@ -235,6 +274,14 @@ if __name__ == '__main__': "protocol": "OGC:WCS", "content-type": "OGC:WCS", "unknown": "1" + }, + { + "name": "Document cadre relatif au dispositif inter-préfectoral en cas de pics de pollution", + "description": "Document cadre relatif au dispositif inter-préfectoral en cas de pics de pollution", + "url": "http://www.prefectures-regions.gouv.fr/auvergne-rhone-alpes/content/download/35211/238621/file/5-7-2017_recueil-84-2017-096-recueil-des-actes-administratifs-special.pdf", + "protocol": "WWW:LINK-1.0-http--link", + "content-type": "text/html", + "unknown": "1" } ] diff --git a/utils/postgis_helper.py b/utils/postgis_helper.py index e558cbafcce6768d877186f7032b90cdf0ece85d..96ecb460d24345e50096f4879250ede2822b15d2 100644 --- a/utils/postgis_helper.py +++ b/utils/postgis_helper.py @@ -10,9 +10,16 @@ from sqlalchemy import select from sqlalchemy import Table # Then load the Geometry type -from geoalchemy2 import Geometry # noqa +from geoalchemy2 import Geometry -#import type_utils +# -------------------------------------------------------------------- +import psycopg2.extensions +DEC2FLOAT = psycopg2.extensions.new_type( + psycopg2.extensions.DECIMAL.values, + 'DEC2FLOAT', + lambda value, curs: float(value) if value is not None else None) +psycopg2.extensions.register_type(DEC2FLOAT) +# -------------------------------------------------------------------- TYPE_PRIORITY_ORDER = ( @@ -62,7 +69,7 @@ class Remote(object): return [name for name in self.inspect.get_schema_names() if name != 'information_schema'] def get_tables(self, schema=None): - for table_name in self.inspect.get_table_names(schema=schema): + for table_name in self.inspect.get_table_names(schema=schema) + self.inspect.get_view_names(schema=schema): if table_name == 'spatial_ref_sys': continue yield self.get_table(table_name, schema=schema) @@ -90,20 +97,19 @@ class Remote(object): items = entry.items() properties = dict(items) + #print('************************here') + geometry = None try: + # this fails if properties['ST_AsGeoJSON_1'] = None geometry = json.loads(properties['ST_AsGeoJSON_1']) - del properties['ST_AsGeoJSON_1'] except: pass - # #print(items) - # if geom is not None: - # try: - # geometry = json.loads(items.pop()[1]) - # except TypeError: - # geom = None - #properties = dict(items) + try: + del properties['ST_AsGeoJSON_1'] + except: + pass document = { 'type': 'Feature', diff --git a/utils/serializers.py b/utils/serializers.py new file mode 100644 index 0000000000000000000000000000000000000000..3632e72f22eafe51fc7c3e35cf9ace41f820fb58 --- /dev/null +++ b/utils/serializers.py @@ -0,0 +1,76 @@ +import datetime +import pytz + +# cf. https://stackoverflow.com/questions/30313243/messagepack-and-datetime +def decode_datetime(obj): + if '__datetime__' in obj.keys(): + tmp = datetime.datetime.strptime(obj["as_str"], "%Y-%m-%dT%H:%M:%S.%fZ") + output = pytz.timezone('UTC').localize(tmp) + elif '__date__' in obj.keys(): + output = datetime.datetime.strptime(obj["as_str"], "%Y-%m-%d") + else: + output = obj + + return output + +# cf. https://stackoverflow.com/questions/30313243/messagepack-and-datetime +def encode_datetime(obj): + if isinstance(obj, datetime.datetime): + + if obj.tzinfo is None: + tmp1 = pytz.timezone("Europe/Paris").localize(obj) + else: + tmp1 = obj + + tmp2 = tmp1.astimezone(pytz.UTC) + + return {'__datetime__': True, 'as_str': tmp2.strftime("%Y-%m-%dT%H:%M:%S.%fZ")} + + if isinstance(obj, datetime.date): + return {'__date__': True, 'as_str': obj.strftime("%Y-%m-%d")} + # if isinstance(obj, Decimal): + # return float(obj) + return obj + + +if __name__ == '__main__': + + import pytz + + print('DATETIME WITHOUT TIMEZONE') + datetime_without_timezone = datetime.datetime.now() + print("Original:", datetime_without_timezone) + + encoded = encode_datetime(datetime_without_timezone) + print("Encoded:", encoded) + + decoded = decode_datetime(encoded) + print("Decoded:", decoded) + + + print('') + print('DATETIME WITH TIMEZONE') + # datetime_with_timezone = datetime.datetime.now() + tmp = pytz.timezone("Europe/Paris").localize(datetime.datetime.now()) + datetime_with_timezone = tmp.astimezone(pytz.timezone('America/New_York')) + print(datetime_with_timezone) + + print("Original:", datetime_with_timezone) + + encoded = encode_datetime(datetime_with_timezone) + print("Encoded:", encoded) + + decoded = decode_datetime(encoded) + print("Decoded:", decoded) + + + print('') + print('DATE') + date = datetime.date(2019, 3, 10) + print("Original:", date) + + encoded = encode_datetime(date) + print("Encoded:", encoded) + + decoded = decode_datetime(encoded) + print("Decoded:", decoded) diff --git a/utils/type_utils.py b/utils/type_utils.py index 101de7b5b3dafd7145d6b449f628d87d081181a8..b2aa4707b9c50660f8ef33c151461d99b83ab8b8 100644 --- a/utils/type_utils.py +++ b/utils/type_utils.py @@ -127,40 +127,46 @@ def convert_to_datetime( value ): #print('DATETIME conversion IN = ', value) #tmp1 = isoparse(value, dayfirst=True) - supported_formats = ['%Y-%m-%d %H:%M:%S', '%Y/%m/%d %H:%M:%S', '%d/%m/%Y %H:%M:%S', '%d-%m-%Y %H:%M:%S'] - supported_formats += ['%Y-%m-%dT%H:%M:%S%z'] - supported_formats += ['%Y-%m-%d %H:%M:%S%z'] - supported_formats += ['%Y-%m-%dT%H:%M:%S'] - supported_formats += ['%Y-%m-%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y'] - - date_found = False - for i, the_format in enumerate(supported_formats): - #print(i, 'here', value, format) - try: - tmp1 = datetime.datetime.strptime( value, the_format ) - #tmp1 = pd.to_datetime( value, format=the_format ) - date_found = True - break - except Exception as e: - #print('!', value, e) - pass - - - - #let's make a further attempt! - if not date_found: + if isinstance( value, str ): + supported_formats = ['%Y-%m-%d %H:%M:%S', '%Y/%m/%d %H:%M:%S', '%d/%m/%Y %H:%M:%S', '%d-%m-%Y %H:%M:%S'] + supported_formats += ['%Y-%m-%dT%H:%M:%S%z'] + supported_formats += ['%Y-%m-%d %H:%M:%S%z'] + supported_formats += ['%Y-%m-%dT%H:%M:%S'] + supported_formats += ['%Y-%m-%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y'] + + date_found = False for i, the_format in enumerate(supported_formats): + #print(i, 'here', value, format) try: - # some datetimes coming from PostgreSQL are formatted as '2015-01-12 10:30:20+01' - # the UTC offset needs to be padded... - tmp1 = datetime.datetime.strptime( value + '00', the_format ) - #tmp1 = pd.to_datetime( value + '00', format=the_format ) + tmp1 = datetime.datetime.strptime( value, the_format ) + #tmp1 = pd.to_datetime( value, format=the_format ) date_found = True break except Exception as e: - #print("!", value, e) + #print('!', value, e) pass + + + #let's make a further attempt! + if not date_found: + for i, the_format in enumerate(supported_formats): + try: + # some datetimes coming from PostgreSQL are formatted as '2015-01-12 10:30:20+01' + # the UTC offset needs to be padded... + tmp1 = datetime.datetime.strptime( value + '00', the_format ) + #tmp1 = pd.to_datetime( value + '00', format=the_format ) + date_found = True + break + except Exception as e: + #print("!", value, e) + pass + + else: + + date_found = True + tmp1 = value + if date_found: if tmp1.tzinfo is None: