diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 6ce94b537390a3fd4ec405364363798f2a97594f..da522df9b05c5c02eb26d5dfe6a046a4fd8fea78 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -38,6 +38,19 @@ build_development: - docker-compose push - "curl -X POST -F token=$CI_JOB_TOKEN -F ref=master -F variables[TAG]=${TAG} https://forge.grandlyon.com/api/v4/projects/373/trigger/pipeline" +build_development_manual: + stage: build + tags: + - build + when: manual + script: + - export TAG=$CI_COMMIT_SHORT_SHA + - docker-compose build metadata-processor + - docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY + - docker-compose push + - "curl -X POST -F token=$CI_JOB_TOKEN -F ref=master -F variables[TAG]=${TAG} https://forge.grandlyon.com/api/v4/projects/373/trigger/pipeline" + + build_release: stage: build only: diff --git a/lib/postgis_helper.py b/lib/postgis_helper.py index 0465027fe6eb798f227f50fab89ed6183d7fed85..3f6d1ddd428a21a13d6ea6ef586b7aca5fafc96e 100644 --- a/lib/postgis_helper.py +++ b/lib/postgis_helper.py @@ -80,7 +80,7 @@ class Remote(object): def count_entries(self, table): return self.engine.execute(table.count()).first()[0] - def get_entries(self, table): + def get_entries(self, table, limit = None): columns, geom = self.get_columns(table) fields = [table.c[col.name] for col in columns] @@ -93,6 +93,9 @@ class Remote(object): fields.append(the_geom) selected = select(fields) + if limit is not None: + selected=selected.limit(limit) + for entry in self.engine.execute(selected): items = entry.items() properties = dict(items) diff --git a/tools/field_type_detector.py b/tools/field_type_detector.py index 91a7863e32b6db77740c43aa14feb48be609ac83..3a04ffc16dc9d27a0b1eb9a8478e8bcb9a6fee84 100644 --- a/tools/field_type_detector.py +++ b/tools/field_type_detector.py @@ -132,8 +132,16 @@ def generate_field_catalog( cfg ): for dbname in dbnames: logging.info('Analyzing database %s...' % dbname) - logging.info('Establishing a database connection...') - pg = Remote(hostname=cfg['postgis']['host'], dbname=dbname, username=cfg['postgis']['username'], password=cfg['postgis']['password']) + + + + if 'timeseries' in dbname: + logging.info('Establishing a database connection to timeseries datas...') + pg = Remote(hostname=cfg['postgis']['timeseries']['host'], dbname=dbname, username=cfg['postgis']['timeseries']['username'], password=cfg['postgis']['timeseries']['password']) + else: + logging.info('Establishing a database connection...') + pg = Remote(hostname=cfg['postgis']['host'], dbname=dbname, username=cfg['postgis']['username'], password=cfg['postgis']['password']) + logging.info('Done.') if isinstance(cfg['postgis']['databases'][dbname], dict) and 'whitelist' in cfg['postgis']['databases'][dbname].keys(): @@ -151,6 +159,13 @@ def generate_field_catalog( cfg ): logging.info('Getting schemas...') schema_names = pg.get_schema_names() + if 'timeseries' in dbname: + schema_names.remove('_timescaledb_cache') + schema_names.remove('_timescaledb_catalog') + schema_names.remove('_timescaledb_config') + schema_names.remove('_timescaledb_internal') + schema_names.remove('timescaledb_experimental') + schema_names.remove('timescaledb_information') logging.info('Done: %s', schema_names) for schema_name in schema_names: @@ -179,7 +194,10 @@ def generate_field_catalog( cfg ): fields = [ col.name for col in columns ] elected_type_by_field_by_dbschematable[db_schema_table] = { "fields": fields, "types": {} } - for doc in pg.get_entries(table): + limit = 1000 + if 'timeseries' in dbname: + limit = 100 + for doc in pg.get_entries(table,limit): properties = doc['properties'] diff --git a/workers/doc_enricher.py b/workers/doc_enricher.py index fc9c7ee4ce79d2d2a4e62fcc7d4792c06cddc4e5..d9a7bc93b39fee432e3b2d0aabac0e44b6f9d6d1 100644 --- a/workers/doc_enricher.py +++ b/workers/doc_enricher.py @@ -18,11 +18,15 @@ from lib.postgis_helper import Remote from lib.serializers import encode_datetime -def get_entries_from_postgis( link, cfg, no_features_per_page=1000 ): +def get_entries_from_postgis( link, cfg, no_features_per_page=500 ): dbname = link['url'].split('/')[-1] schema, table_name = link['name'].split('.') - + maxFeatures = 2000000 + if 'timeseries' in dbname: + maxFeatures = 50 + cfg = cfg['timeseries'] + logging.info('Getting data from database %s...' % dbname) logging.info('Establishing a database connection...') pg = Remote(hostname=cfg['host'], dbname=dbname, username=cfg['username'], password=cfg['password']) @@ -41,6 +45,10 @@ def get_entries_from_postgis( link, cfg, no_features_per_page=1000 ): serialized_deferred_count = dill.dumps(deferred_count) count = pg.count_entries(table) + + if count >= maxFeatures: + count = maxFeatures + no_pages = int( math.ceil(count/no_features_per_page) ) logging.debug('No. of pages = %i' % no_pages) @@ -53,16 +61,23 @@ def get_entries_from_postgis( link, cfg, no_features_per_page=1000 ): feature_page = [] # we accumulate entries in this sort of buffer cnt = 0 + totalCnt = 0 for entry in pg.get_entries(table): feature_page.append(entry) - if len(feature_page) == no_features_per_page: + totalCnt += 1 + if len(feature_page) == no_features_per_page or totalCnt >= count: cnt += 1 + print('Yielding page %i/%i, with %i features' % (cnt, no_pages, len(feature_page))) logging.debug('Yielding page %i/%i, with %i features' % (cnt, no_pages, len(feature_page))) yield (cnt/no_pages, count, serialized_deferred_count, feature_page) feature_page = [] + if totalCnt >= count: + return + if cnt/no_pages != 1: + print('Yielding last page with %i features' % len(feature_page)) logging.debug('Yielding last page with %i features' % len(feature_page)) yield (1, count, serialized_deferred_count, feature_page) # this will be the last feature_page diff --git a/workers/doc_indexer.py b/workers/doc_indexer.py index 2a6494876869e9a635f236431c17918f028d094d..fac0eafd61dfa4f793e5a938d219278fd10daffa 100644 --- a/workers/doc_indexer.py +++ b/workers/doc_indexer.py @@ -37,6 +37,7 @@ def tag_doc( the_doc ): # isQueryable? tag_dict['isQueryable'] = False # default + if 'link' in the_doc['metadata-fr'].keys(): for link in the_doc['metadata-fr']['link']: #print(link) @@ -44,6 +45,14 @@ def tag_doc( the_doc ): tag_dict['isQueryable'] = True break + tag_dict['isTimeseries'] = False # default + if 'link' in the_doc['metadata-fr'].keys(): + for link in the_doc['metadata-fr']['link']: + #print(link) + if 'service' in link.keys() and link['service'] == 'WS' and 'timeseries' in link['url']: + tag_dict['isTimeseries'] = True + break + # N.B.: in order to determine the following tags, we need the data-fr field; # in case the data-fr field is absent, the tags 'isSearchable', # 'isPunctual', 'isLinear', 'isAreal' will be absent instead of being 'falsely' set to false! diff --git a/workers/metadata_processor.py b/workers/metadata_processor.py index 217bddf9edfce4d73d901451cf691c207b807265..90f4862bbbdb51f5cf284a3ac2b973e7a29e61a1 100644 --- a/workers/metadata_processor.py +++ b/workers/metadata_processor.py @@ -305,7 +305,7 @@ def callback( channel, method, properties, body ): for link in out_record['metadata-fr']['link']: #TODO: generalize! - if 'service' in link.keys() and link['service'] == 'WFS' and 'data.grandlyon.com' in link['url']: + if 'service' in link.keys() and (link['service'] == 'WFS' or link['service'] == 'WS') and 'data.grandlyon.com' in link['url']: logging.debug('EUREKA : found a WFS ressource!') wfs_found = True full_version = out_record.copy() # including metadata AND data