From 40054ae3e5ac85c21ac9682896a09605339c7f59 Mon Sep 17 00:00:00 2001 From: Alessandro Cerioni <acerioni@grandlyon.com> Date: Wed, 6 Mar 2019 18:54:07 +0100 Subject: [PATCH] First fully working (?) version getting data from PostGIS. --- 0-reset-session.py | 3 +- 2-metadata-processor.py | 14 ++--- 3-doc-enricher.py | 114 +++++++++++++++++++++++++++++++++++- 5-pg-field-type-detector.py | 36 +++++++----- 6-doc-processor.py | 10 +++- 7-doc-indexer.py | 2 +- 8-reindexer.py | 2 +- utils/close_connection.py | 3 +- utils/postgis_helper.py | 26 ++++---- utils/type_utils.py | 60 ++++++++++--------- 10 files changed, 203 insertions(+), 67 deletions(-) diff --git a/0-reset-session.py b/0-reset-session.py index 6c7ad9f..f6032fc 100644 --- a/0-reset-session.py +++ b/0-reset-session.py @@ -38,7 +38,8 @@ if __name__ == '__main__': # delete this session files in the working_directory for filename in glob.glob("%s/%s*" % (cfg['session']['working_directory'], cfg['session']['id'])): - os.remove(filename) + if 'json' not in filename: + os.remove(filename) # delete MongoDB collections/records related to this session diff --git a/2-metadata-processor.py b/2-metadata-processor.py index 60473d1..82d61d5 100644 --- a/2-metadata-processor.py +++ b/2-metadata-processor.py @@ -111,11 +111,11 @@ def list_to_dictlist( the_input, the_context=None ): if the_context == 'geoBox': polygon = [] - polygon.append([the_output[0]['westBoundLongitude'], the_output[0]['southBoundLatitude']]) - polygon.append([the_output[0]['eastBoundLongitude'], the_output[0]['southBoundLatitude']]) - polygon.append([the_output[0]['eastBoundLongitude'], the_output[0]['northBoundLatitude']]) - polygon.append([the_output[0]['westBoundLongitude'], the_output[0]['northBoundLatitude']]) - polygon.append([the_output[0]['westBoundLongitude'], the_output[0]['southBoundLatitude']]) + polygon.append([float(the_output[0]['westBoundLongitude']), float(the_output[0]['southBoundLatitude'])]) + polygon.append([float(the_output[0]['eastBoundLongitude']), float(the_output[0]['southBoundLatitude'])]) + polygon.append([float(the_output[0]['eastBoundLongitude']), float(the_output[0]['northBoundLatitude'])]) + polygon.append([float(the_output[0]['westBoundLongitude']), float(the_output[0]['northBoundLatitude'])]) + polygon.append([float(the_output[0]['westBoundLongitude']), float(the_output[0]['southBoundLatitude'])]) the_output = {'type': 'Polygon', 'coordinates': [polygon]} @@ -271,8 +271,8 @@ def process_page( channel, method, properties, body, **kwargs): if 'link' in metadata_record['metadata-fr'].keys(): for link in metadata_record['metadata-fr']['link']: - - if 'service' in link.keys() and link['service'] == 'WFS': + #TODO: generalize! + if 'service' in link.keys() and link['service'] == 'WFS' and 'data.grandlyon.com' in link['url']: logging.debug('EUREKA : found a WFS ressource!') wfs_found = True #documents_to_index = [] diff --git a/3-doc-enricher.py b/3-doc-enricher.py index affbc71..cb3fd27 100644 --- a/3-doc-enricher.py +++ b/3-doc-enricher.py @@ -2,8 +2,52 @@ import pika import msgpack import requests import json +import datetime +#from decimal import Decimal from utils.exit_gracefully import exit_gracefully from utils.my_logging import logging +from utils.postgis_helper import Remote +from utils.serializers import encode_datetime + + +def get_entries_from_postgis( link, cfg, no_features_per_page=1000 ): + #print('here', link) + + dbname = link['url'].split('/')[-1] + schema, table = link['name'].split('.') + print(dbname, schema, table) + #print(cfg) + #exit(1) + + logging.info('Getting data from database %s...' % dbname) + logging.info('Establishing a database connection...') + pg = Remote(hostname=cfg['host'], dbname=dbname, username=cfg['username'], password=cfg['password']) + logging.info('Done.') + + table = pg.get_table(table, schema=schema) + + + #print(pg.get_tables('bruit')) + + count = pg.count_entries(table) + + no_pages = count//no_features_per_page+1 + logging.info('Getting %i entries in %i pages from table %s.%s...'% (count, no_pages, schema, table)) + + feature_page = [] # we accumulate entries in this sort of buffer + cnt = 0 + for entry in pg.get_entries(table): + feature_page.append(entry) + if len(feature_page) == no_features_per_page: + cnt += 1 + print('YIELDING PAGE %i/%i' % (cnt, no_pages)) + yield feature_page + feature_page = [] + + print('YIELDING LAST PAGE', len(feature_page)) + yield feature_page # this will be the last feature_page + + return def get_wfs( link, offset=0, no_features_per_page=1000 ): @@ -66,7 +110,7 @@ def get_wfs( link, offset=0, no_features_per_page=1000 ): -def enrich_docs( channel, method, properties, body, **kwargs ): +def old_enrich_docs( channel, method, properties, body, **kwargs ): decoded_body = msgpack.unpackb(body, raw=False) @@ -80,6 +124,7 @@ def enrich_docs( channel, method, properties, body, **kwargs ): feature_page = get_wfs(wfs_info, offset, kwargs['features_per_page']) + #feature_page = get_entries_from_postgis(wfs_info, kwargs['postgis_cfg']) # we implement pagination by letting this program creating tasks for itself / its siblings if feature_page != None and len(feature_page) == kwargs['features_per_page']: # at least another page is needed @@ -99,7 +144,6 @@ def enrich_docs( channel, method, properties, body, **kwargs ): # #for feature_page in feature_pages: # #print(feature_page[0]['properties']['nom_reduit']) logging.info('Sending feature page of len = %i to RabbitMQ and MongoDB...' % len(feature_page)) - doc_page = [{**decoded_body['body'], 'data-fr': feature} for feature in feature_page] msg = {'header': {'metadata': decoded_body['body'], 'session_id': session_id, 'dest_index': dest_index}, 'body': doc_page} @@ -134,6 +178,69 @@ def enrich_docs( channel, method, properties, body, **kwargs ): return #out_docs +def enrich_docs( channel, method, properties, body, **kwargs ): + + + + + decoded_body = msgpack.unpackb(body, raw=False) + + wfs_info = decoded_body['header']['wfs_info'] + offset = decoded_body['header']['offset'] + session_id = decoded_body['header']['session_id'] + dest_index = decoded_body['header']['dest_index'] + + + logging.info('Enriching dataset named: %s' % decoded_body['body']['metadata-fr']['title']) + + #feature_page = get_entries_from_postgis(wfs_info, kwargs['postgis_cfg']) + + #if feature_page != None: + #print('here') + for feature_page in get_entries_from_postgis(wfs_info, kwargs['postgis_cfg']): + #print('here') + logging.info('Sending feature page of len = %i to RabbitMQ...' % len(feature_page)) + # for el in feature_page: + # for key, v in el.items(): + # print(key, v) + #print("LEN", len(feature_page)) + #continue + + doc_page = [{**decoded_body['body'], 'data-fr': feature} for feature in feature_page] + + msg = {'header': {'metadata': decoded_body['body'], 'session_id': session_id, 'dest_index': dest_index}, 'body': doc_page} + #print('here') + the_body = msgpack.packb(msg, use_bin_type=True, default=encode_datetime) + #print('there') + + # channel.basic_publish( exchange=kwargs['exchange'], + # routing_key=kwargs['doc_pages_to_store_in_mongo_rk'], + # body=the_body, + # properties=pika.BasicProperties(delivery_mode = 2) + # ) + + + channel.basic_publish( exchange=kwargs['exchange'], + routing_key=kwargs['doc_pages_to_process_rk'], + body=the_body, + properties=pika.BasicProperties(delivery_mode = 2) + ) + + logging.info('...done!') + + # except TypeError: # it means that getWFS returned None + # pass + # # + # #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + + + + channel.basic_ack(delivery_tag = method.delivery_tag) + #channel.basic_nack(delivery_tag = method.delivery_tag, requeue=1) + + return #out_docs + + def main(cfg): from utils.close_connection import on_timeout @@ -172,7 +279,8 @@ def main(cfg): doc_pages_to_store_in_mongo_rk=doc_pages_to_store_in_mongo_rk, docs_to_enrich_rk=docs_to_enrich_rk, doc_pages_to_process_rk=doc_pages_to_process_rk, - features_per_page=cfg['wfs']['features_per_page']), + features_per_page=cfg['wfs']['features_per_page'], + postgis_cfg=cfg['postgis']), queue=docs_to_enrich_qn)#, no_ack=True) channel.start_consuming() connection.close() diff --git a/5-pg-field-type-detector.py b/5-pg-field-type-detector.py index f3c85c6..15faf28 100644 --- a/5-pg-field-type-detector.py +++ b/5-pg-field-type-detector.py @@ -1,19 +1,26 @@ import sys import json import re -import pika -import itertools +#import pika +#import itertools import time -from sqlalchemy import create_engine -from sqlalchemy.engine import reflection -from sqlalchemy import MetaData -from sqlalchemy import select -from sqlalchemy import Table - -# Then load the Geometry type -from geoalchemy2 import Geometry # noqa - - +# from sqlalchemy import create_engine +# from sqlalchemy.engine import reflection +# from sqlalchemy import MetaData +# from sqlalchemy import select +# from sqlalchemy import Table +# +# # Then load the Geometry type +# from geoalchemy2 import Geometry # noqa +# +# # -------------------------------------------------------------------- +# import psycopg2.extensions +# DEC2FLOAT = psycopg2.extensions.new_type( +# psycopg2.extensions.DECIMAL.values, +# 'DEC2FLOAT', +# lambda value, curs: float(value) if value is not None else None) +# psycopg2.extensions.register_type(DEC2FLOAT) +# # -------------------------------------------------------------------- from utils.flatten_utils import flatten_json, unflatten_json from utils.type_utils import detect_type, detect_type_new @@ -191,8 +198,8 @@ def generate_field_catalog( cfg, pg, catalog=None ): else: output = catalog.copy() - # selected_schema = "rdata" - # selected_table = selected_schema + ".sit_sitra.sittourisme" + # selected_schema = "sit_sitra" + # selected_table = selected_schema + ".sittourisme" # found = False logging.info('Getting schemas...') @@ -207,6 +214,7 @@ def generate_field_catalog( cfg, pg, catalog=None ): # continue # print(schema_name) for table in pg.get_tables(schema_name): + # print(schema_name) # if str(table) != selected_table: # continue # else: diff --git a/6-doc-processor.py b/6-doc-processor.py index 8409753..eae6c00 100644 --- a/6-doc-processor.py +++ b/6-doc-processor.py @@ -2,14 +2,17 @@ import pika import msgpack #from pprint import pprint #from dateutil.parser import parse +import datetime import re import json +import sys from utils.flatten_utils import flatten_json, unflatten_json #import pytz #import sys -from utils.type_utils import convert_to_datetime, convert_to_float, convert_to_int, convert_to_str +from utils.type_utils import convert_to_datetime, convert_to_float, convert_to_int, convert_to_str, convert_to_boolean from utils.exit_gracefully import exit_gracefully from utils.my_logging import logging +from utils.serializers import decode_datetime @@ -70,7 +73,10 @@ def fix_field_types( in_docs, out_types ): def process_docs( channel, method, properties, body, **kwargs ): - decoded_body = msgpack.unpackb(body, raw=False) + + decoded_body = msgpack.unpackb(body, raw=False, object_hook=decode_datetime) + #print(decoded_body) + #exit(0) the_dest_index = decoded_body['header']['dest_index'] docs = decoded_body['body'] diff --git a/7-doc-indexer.py b/7-doc-indexer.py index 6285613..9bd8c0d 100644 --- a/7-doc-indexer.py +++ b/7-doc-indexer.py @@ -66,7 +66,7 @@ def tag_doc( the_doc ): if any( [x in the_doc['data-fr']['geometry']['type'] for x in ['Polygon', 'MultiPolygon']] ): tag_dict['isAreal'] = True - tagged_doc = {'editorial-metadata-en': tag_dict, **the_doc} + tagged_doc = {'editorial-metadata': tag_dict, **the_doc} return tagged_doc diff --git a/8-reindexer.py b/8-reindexer.py index d4d0300..5774697 100644 --- a/8-reindexer.py +++ b/8-reindexer.py @@ -61,7 +61,7 @@ def main(cfg, wait=True): body['source']['remote'] = cfg['reindexer']['source_url'] logging.debug(body) - + rep = es.reindex(body, wait_for_completion=False) #, slices=24)#, timeout='120s')# timeout='120s', slices=1000, requests_per_second=-1)#, timeout='2m') logging.info(rep) diff --git a/utils/close_connection.py b/utils/close_connection.py index bbb62bd..3a6c092 100644 --- a/utils/close_connection.py +++ b/utils/close_connection.py @@ -1,4 +1,4 @@ -#import sys +import sys from .my_logging import logging def on_timeout(the_connection): @@ -8,6 +8,7 @@ def on_timeout(the_connection): # global connection logging.info('Waiting for messages...') the_connection.close() + #logging.info('Exiting.') #sys.exit(0) #return diff --git a/utils/postgis_helper.py b/utils/postgis_helper.py index e558cba..1b26ba5 100644 --- a/utils/postgis_helper.py +++ b/utils/postgis_helper.py @@ -10,9 +10,16 @@ from sqlalchemy import select from sqlalchemy import Table # Then load the Geometry type -from geoalchemy2 import Geometry # noqa +from geoalchemy2 import Geometry -#import type_utils +# -------------------------------------------------------------------- +import psycopg2.extensions +DEC2FLOAT = psycopg2.extensions.new_type( + psycopg2.extensions.DECIMAL.values, + 'DEC2FLOAT', + lambda value, curs: float(value) if value is not None else None) +psycopg2.extensions.register_type(DEC2FLOAT) +# -------------------------------------------------------------------- TYPE_PRIORITY_ORDER = ( @@ -90,20 +97,19 @@ class Remote(object): items = entry.items() properties = dict(items) + #print('************************here') + geometry = None try: + # this fails if properties['ST_AsGeoJSON_1'] = None geometry = json.loads(properties['ST_AsGeoJSON_1']) - del properties['ST_AsGeoJSON_1'] except: pass - # #print(items) - # if geom is not None: - # try: - # geometry = json.loads(items.pop()[1]) - # except TypeError: - # geom = None - #properties = dict(items) + try: + del properties['ST_AsGeoJSON_1'] + except: + pass document = { 'type': 'Feature', diff --git a/utils/type_utils.py b/utils/type_utils.py index 101de7b..b2aa470 100644 --- a/utils/type_utils.py +++ b/utils/type_utils.py @@ -127,40 +127,46 @@ def convert_to_datetime( value ): #print('DATETIME conversion IN = ', value) #tmp1 = isoparse(value, dayfirst=True) - supported_formats = ['%Y-%m-%d %H:%M:%S', '%Y/%m/%d %H:%M:%S', '%d/%m/%Y %H:%M:%S', '%d-%m-%Y %H:%M:%S'] - supported_formats += ['%Y-%m-%dT%H:%M:%S%z'] - supported_formats += ['%Y-%m-%d %H:%M:%S%z'] - supported_formats += ['%Y-%m-%dT%H:%M:%S'] - supported_formats += ['%Y-%m-%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y'] - - date_found = False - for i, the_format in enumerate(supported_formats): - #print(i, 'here', value, format) - try: - tmp1 = datetime.datetime.strptime( value, the_format ) - #tmp1 = pd.to_datetime( value, format=the_format ) - date_found = True - break - except Exception as e: - #print('!', value, e) - pass - - - - #let's make a further attempt! - if not date_found: + if isinstance( value, str ): + supported_formats = ['%Y-%m-%d %H:%M:%S', '%Y/%m/%d %H:%M:%S', '%d/%m/%Y %H:%M:%S', '%d-%m-%Y %H:%M:%S'] + supported_formats += ['%Y-%m-%dT%H:%M:%S%z'] + supported_formats += ['%Y-%m-%d %H:%M:%S%z'] + supported_formats += ['%Y-%m-%dT%H:%M:%S'] + supported_formats += ['%Y-%m-%d', '%Y/%m/%d', '%d/%m/%Y', '%d-%m-%Y'] + + date_found = False for i, the_format in enumerate(supported_formats): + #print(i, 'here', value, format) try: - # some datetimes coming from PostgreSQL are formatted as '2015-01-12 10:30:20+01' - # the UTC offset needs to be padded... - tmp1 = datetime.datetime.strptime( value + '00', the_format ) - #tmp1 = pd.to_datetime( value + '00', format=the_format ) + tmp1 = datetime.datetime.strptime( value, the_format ) + #tmp1 = pd.to_datetime( value, format=the_format ) date_found = True break except Exception as e: - #print("!", value, e) + #print('!', value, e) pass + + + #let's make a further attempt! + if not date_found: + for i, the_format in enumerate(supported_formats): + try: + # some datetimes coming from PostgreSQL are formatted as '2015-01-12 10:30:20+01' + # the UTC offset needs to be padded... + tmp1 = datetime.datetime.strptime( value + '00', the_format ) + #tmp1 = pd.to_datetime( value + '00', format=the_format ) + date_found = True + break + except Exception as e: + #print("!", value, e) + pass + + else: + + date_found = True + tmp1 = value + if date_found: if tmp1.tzinfo is None: -- GitLab