Skip to content
Snippets Groups Projects
Commit 038b8077 authored by Damien DESPRES's avatar Damien DESPRES
Browse files

Redmine 13657 gestion timeseries

parent 4095c5eb
No related branches found
No related tags found
1 merge request!27Redmine 13657 gestion timeseries
......@@ -38,6 +38,19 @@ build_development:
- docker-compose push
- "curl -X POST -F token=$CI_JOB_TOKEN -F ref=master -F variables[TAG]=${TAG} https://forge.grandlyon.com/api/v4/projects/373/trigger/pipeline"
build_development_manual:
stage: build
tags:
- build
when: manual
script:
- export TAG=$CI_COMMIT_SHORT_SHA
- docker-compose build metadata-processor
- docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY
- docker-compose push
- "curl -X POST -F token=$CI_JOB_TOKEN -F ref=master -F variables[TAG]=${TAG} https://forge.grandlyon.com/api/v4/projects/373/trigger/pipeline"
build_release:
stage: build
only:
......
......@@ -80,7 +80,7 @@ class Remote(object):
def count_entries(self, table):
return self.engine.execute(table.count()).first()[0]
def get_entries(self, table):
def get_entries(self, table, limit = None):
columns, geom = self.get_columns(table)
fields = [table.c[col.name] for col in columns]
......@@ -93,6 +93,9 @@ class Remote(object):
fields.append(the_geom)
selected = select(fields)
if limit is not None:
selected=selected.limit(limit)
for entry in self.engine.execute(selected):
items = entry.items()
properties = dict(items)
......
......@@ -132,8 +132,16 @@ def generate_field_catalog( cfg ):
for dbname in dbnames:
logging.info('Analyzing database %s...' % dbname)
logging.info('Establishing a database connection...')
pg = Remote(hostname=cfg['postgis']['host'], dbname=dbname, username=cfg['postgis']['username'], password=cfg['postgis']['password'])
if 'timeseries' in dbname:
logging.info('Establishing a database connection to timeseries datas...')
pg = Remote(hostname=cfg['postgis']['timeseries']['host'], dbname=dbname, username=cfg['postgis']['timeseries']['username'], password=cfg['postgis']['timeseries']['password'])
else:
logging.info('Establishing a database connection...')
pg = Remote(hostname=cfg['postgis']['host'], dbname=dbname, username=cfg['postgis']['username'], password=cfg['postgis']['password'])
logging.info('Done.')
if isinstance(cfg['postgis']['databases'][dbname], dict) and 'whitelist' in cfg['postgis']['databases'][dbname].keys():
......@@ -151,6 +159,13 @@ def generate_field_catalog( cfg ):
logging.info('Getting schemas...')
schema_names = pg.get_schema_names()
if 'timeseries' in dbname:
schema_names.remove('_timescaledb_cache')
schema_names.remove('_timescaledb_catalog')
schema_names.remove('_timescaledb_config')
schema_names.remove('_timescaledb_internal')
schema_names.remove('timescaledb_experimental')
schema_names.remove('timescaledb_information')
logging.info('Done: %s', schema_names)
for schema_name in schema_names:
......@@ -179,7 +194,10 @@ def generate_field_catalog( cfg ):
fields = [ col.name for col in columns ]
elected_type_by_field_by_dbschematable[db_schema_table] = { "fields": fields, "types": {} }
for doc in pg.get_entries(table):
limit = 1000
if 'timeseries' in dbname:
limit = 100
for doc in pg.get_entries(table,limit):
properties = doc['properties']
......
......@@ -18,11 +18,15 @@ from lib.postgis_helper import Remote
from lib.serializers import encode_datetime
def get_entries_from_postgis( link, cfg, no_features_per_page=1000 ):
def get_entries_from_postgis( link, cfg, no_features_per_page=500 ):
dbname = link['url'].split('/')[-1]
schema, table_name = link['name'].split('.')
maxFeatures = 2000000
if 'timeseries' in dbname:
maxFeatures = 50
cfg = cfg['timeseries']
logging.info('Getting data from database %s...' % dbname)
logging.info('Establishing a database connection...')
pg = Remote(hostname=cfg['host'], dbname=dbname, username=cfg['username'], password=cfg['password'])
......@@ -41,6 +45,10 @@ def get_entries_from_postgis( link, cfg, no_features_per_page=1000 ):
serialized_deferred_count = dill.dumps(deferred_count)
count = pg.count_entries(table)
if count >= maxFeatures:
count = maxFeatures
no_pages = int( math.ceil(count/no_features_per_page) )
logging.debug('No. of pages = %i' % no_pages)
......@@ -53,16 +61,23 @@ def get_entries_from_postgis( link, cfg, no_features_per_page=1000 ):
feature_page = [] # we accumulate entries in this sort of buffer
cnt = 0
totalCnt = 0
for entry in pg.get_entries(table):
feature_page.append(entry)
if len(feature_page) == no_features_per_page:
totalCnt += 1
if len(feature_page) == no_features_per_page or totalCnt >= count:
cnt += 1
print('Yielding page %i/%i, with %i features' % (cnt, no_pages, len(feature_page)))
logging.debug('Yielding page %i/%i, with %i features' % (cnt, no_pages, len(feature_page)))
yield (cnt/no_pages, count, serialized_deferred_count, feature_page)
feature_page = []
if totalCnt >= count:
return
if cnt/no_pages != 1:
print('Yielding last page with %i features' % len(feature_page))
logging.debug('Yielding last page with %i features' % len(feature_page))
yield (1, count, serialized_deferred_count, feature_page) # this will be the last feature_page
......
......@@ -37,6 +37,7 @@ def tag_doc( the_doc ):
# isQueryable?
tag_dict['isQueryable'] = False # default
if 'link' in the_doc['metadata-fr'].keys():
for link in the_doc['metadata-fr']['link']:
#print(link)
......@@ -44,6 +45,14 @@ def tag_doc( the_doc ):
tag_dict['isQueryable'] = True
break
tag_dict['isTimeseries'] = False # default
if 'link' in the_doc['metadata-fr'].keys():
for link in the_doc['metadata-fr']['link']:
#print(link)
if 'service' in link.keys() and link['service'] == 'WS' and 'timeseries' in link['url']:
tag_dict['isTimeseries'] = True
break
# N.B.: in order to determine the following tags, we need the data-fr field;
# in case the data-fr field is absent, the tags 'isSearchable',
# 'isPunctual', 'isLinear', 'isAreal' will be absent instead of being 'falsely' set to false!
......
......@@ -305,7 +305,7 @@ def callback( channel, method, properties, body ):
for link in out_record['metadata-fr']['link']:
#TODO: generalize!
if 'service' in link.keys() and link['service'] == 'WFS' and 'data.grandlyon.com' in link['url']:
if 'service' in link.keys() and (link['service'] == 'WFS' or link['service'] == 'WS') and 'data.grandlyon.com' in link['url']:
logging.debug('EUREKA : found a WFS ressource!')
wfs_found = True
full_version = out_record.copy() # including metadata AND data
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment