Skip to content
Snippets Groups Projects
Commit 60525ec4 authored by Alessandro Cerioni's avatar Alessandro Cerioni
Browse files

Deferred count of the no. of records

parent 4d3e77d7
No related branches found
No related tags found
No related merge requests found
......@@ -18,3 +18,4 @@ flask
gunicorn
apscheduler
flask-executor
dill
......@@ -5,6 +5,7 @@ import json
import datetime
import os, sys
import math
import dill
from sqlalchemy.exc import NoSuchTableError
fileDir = os.path.dirname(os.path.abspath(__file__))
......@@ -33,6 +34,12 @@ def get_entries_from_postgis( link, cfg, no_features_per_page=1000 ):
logging.debug('Table %s in schema % s not found :-(' % (table_name, schema))
return
def deferred_count():
local_pg = Remote(hostname=cfg['host'], dbname=dbname, username=cfg['username'], password=cfg['password'])
return local_pg.count_entries(table)
serialized_deferred_count = dill.dumps(deferred_count)
count = pg.count_entries(table)
no_pages = int( math.ceil(count/no_features_per_page) )
......@@ -51,12 +58,12 @@ def get_entries_from_postgis( link, cfg, no_features_per_page=1000 ):
if len(feature_page) == no_features_per_page:
cnt += 1
logging.debug('Yielding page %i/%i, with %i features' % (cnt, no_pages, len(feature_page)))
yield (cnt/no_pages, count, feature_page)
yield (cnt/no_pages, count, serialized_deferred_count, feature_page)
feature_page = []
if cnt/no_pages != 1:
logging.debug('Yielding last page with %i features' % len(feature_page))
yield (1, count, feature_page) # this will be the last feature_page
yield (1, count, serialized_deferred_count, feature_page) # this will be the last feature_page
return
......@@ -227,7 +234,7 @@ def enrich_docs( channel, method, properties, body ):
postgis_cfg = cfg['postgis']
for progress_ratio, count, feature_page in get_entries_from_postgis(wfs_info, postgis_cfg):
for progress_ratio, count, serialized_deferred_count, feature_page in get_entries_from_postgis(wfs_info, postgis_cfg):
last_update = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
......@@ -253,6 +260,7 @@ def enrich_docs( channel, method, properties, body ):
msg['header']['cfg'] = cfg
msg['header']['progress_ratio'] = progress_ratio
msg['header']['count'] = count
msg['header']['serialized_deferred_count'] = serialized_deferred_count
msg['body'] = doc_page
the_body = msgpack.packb(msg, use_bin_type=True, default=encode_datetime)
......
......@@ -87,7 +87,8 @@ def index_docs(channel, method, properties, body):
cfg = decoded_body['header']['cfg']
progress_ratio = decoded_body['header']['progress_ratio']
count = decoded_body['header']['count']
count = decoded_body['header']['count']
serialized_deferred_count = decoded_body['header']['serialized_deferred_count']
es = Elasticsearch([cfg['indexer']['url']], timeout=60)
# es_logger = logging.getLogger('elasticsearch')
......@@ -145,9 +146,10 @@ def index_docs(channel, method, properties, body):
msg['header'] = dict()
msg['header']['cfg'] = cfg
msg['header']['count'] = count
msg['header']['serialized_deferred_count'] = serialized_deferred_count
msg['body'] = uuid
the_body = msgpack.packb(msg, use_bin_type=False)
the_body = msgpack.packb(msg, use_bin_type=True)
channel.basic_publish( exchange=exchange,
routing_key=routing_key,
......
......@@ -89,7 +89,8 @@ def process_docs( channel, method, properties, body ):
cfg = decoded_body['header']['cfg']
progress_ratio = decoded_body['header']['progress_ratio']
count = decoded_body['header']['count']
count = decoded_body['header']['count']
serialized_deferred_count = decoded_body['header']['serialized_deferred_count']
docs = decoded_body['body']
filename = os.path.join( parentDir, cfg['session']['working_directory'], 'field_types.json' )
......@@ -131,6 +132,7 @@ def process_docs( channel, method, properties, body ):
msg['header']['cfg'] = cfg
msg['header']['progress_ratio'] = progress_ratio
msg['header']['count'] = count
msg['header']['serialized_deferred_count'] = serialized_deferred_count
msg['body'] = docs_to_index
the_body = msgpack.packb(msg, use_bin_type=True)
......
......@@ -3,6 +3,7 @@ import json
import msgpack
import pika
import os, sys
import dill
from elasticsearch import Elasticsearch, NotFoundError
fileDir = os.path.dirname(os.path.abspath(__file__))
......@@ -12,6 +13,7 @@ sys.path.append(newPath)
from lib.my_logging import logging
from lib.exit_gracefully import exit_gracefully
from lib.locker import unlock
from lib.postgis_helper import Remote
class NotEmptyQueueException(Exception):
pass
......@@ -52,11 +54,12 @@ def create_sampling_task(cfg, channel, uuid):
def on_msg_callback(channel, method, properties, body):
decoded_body = msgpack.unpackb(body, raw=False)
cfg = decoded_body['header']['cfg']
uuid = decoded_body['body']
count_ref = decoded_body['header']['count']
serialized_deferred_count = decoded_body['header']['serialized_deferred_count']
deferred_count = dill.loads(serialized_deferred_count)
count_ref = deferred_count()
# from lib.elasticsearch_template import template
# template['index_patterns'] = [ cfg['reindexer']['destination_index'] ]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment