Newer
Older
import xml.etree.ElementTree as ET
import os.path, time
import codecs
import hashlib
import requests
from .my_logging import logging
# ------------------------------------------------------------------------------
def str_to_float( the_string ):
return float( the_string )
def amend_epsg_label( epsg_label ):
return "EPSG:%s" % epsg_label.split(':')[-1]
def analyze_wms_capabilities( capabilities, the_layer ):
output = dict()
root = ET.fromstring(capabilities)
for layer in root.findall('.//{http://www.opengis.net/wms}Layer'):
name = layer.find('.//{http://www.opengis.net/wms}Name')
#print(name.text)
# do nothing if the layer name doesn't match with the one we are looking for
if name.text != the_layer:
continue
output['bbox_by_projection'] = dict()
output['projections'] = list()
for bbox in layer.findall('.//{http://www.opengis.net/wms}BoundingBox'):
#print(bbox.attrib)
output['bbox_by_projection'][bbox.attrib['CRS']] = dict()
output['projections'].append(bbox.attrib['CRS'])
for key, value in bbox.attrib.items():
if key != 'CRS':
#print(key)
output['bbox_by_projection'][bbox.attrib['CRS']][key] = str_to_float( value )
#print(output)
#exit(1)
return output
def analyze_wfs_capabilities( capabilities, the_layer ):
output = dict()
root = ET.fromstring(capabilities)
for layer in root.findall('.//{http://www.opengis.net/wfs/2.0}FeatureType'):
layer_name = layer.find('.//{http://www.opengis.net/wfs/2.0}Name')
# do nothing if the layer name doesn't match with the one we are looking for
if layer_name.text != "ms:" + the_layer and layer_name.text != "public:" + the_layer:
default_crs = layer.find('.//{http://www.opengis.net/wfs/2.0}DefaultCRS')
other_crss = layer.findall('.//{http://www.opengis.net/wfs/2.0}OtherCRS')
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
output['projections'] = [ amend_epsg_label(default_crs.text) ]
for other_crs in other_crss:
output['projections'].append( amend_epsg_label(other_crs.text) )
return output
def analyze_wcs_capabilities( capabilities, the_layer ):
output = dict()
root = ET.fromstring( capabilities )
for layer in root.findall('.//{http://www.opengis.net/wcs/1.1}CoverageDescription'):
name = layer.find('{http://www.opengis.net/wcs/1.1}Identifier')
# do nothing if the layer name doesn't match with the one we are looking for
if name.text != the_layer:
continue
output['bbox_by_projection'] = dict()
output['projections'] = list()
for bbox in layer.findall('.//{http://www.opengis.net/ows/1.1}BoundingBox'):
#print(bbox.attrib)
if 'EPSG' in bbox.attrib['crs']:
#print( amend_epsg_label(bbox.attrib['crs']) )
crs = amend_epsg_label(bbox.attrib['crs'])
output['projections'].append(crs)
lower_corner = bbox.find('./{http://www.opengis.net/ows/1.1}LowerCorner')
#print(lower_corner.text)
minx, miny = lower_corner.text.split(' ')
minx = str_to_float(minx)
miny = str_to_float(miny)
upper_corner = bbox.find('./{http://www.opengis.net/ows/1.1}UpperCorner')
#print(upper_corner.text)
maxx, maxy = upper_corner.text.split(' ')
maxx = str_to_float(maxx)
maxy = str_to_float(maxy)
output['bbox_by_projection'][crs] = {'minx': minx, 'miny': miny, 'maxx': maxx, 'maxy': maxy}
return output
def get_capabilities( root_url, service, working_directory ):
if root_url.endswith('?'):
local_root_url = root_url
else:
local_root_url = root_url + '?'
if service == 'wms':
url = local_root_url + 'version=1.3.0&service=WMS&request=GetCapabilities'
md5 = hashlib.md5( root_url.encode("utf-8") ).hexdigest()
filename = os.path.join( working_directory, 'WMS_Capabilities_%s.xml' % md5)
elif service == 'wfs':
url = local_root_url + 'request=GetCapabilities&service=WFS'
md5 = hashlib.md5( root_url.encode("utf-8") ).hexdigest()
filename = os.path.join( working_directory, 'WFS_Capabilities_%s.xml' % md5)
elif service == 'wcs':
url = local_root_url + 'version=1.1.0&service=WCS&request=DescribeCoverage'
md5 = hashlib.md5( root_url.encode("utf-8") ).hexdigest()
filename = os.path.join( working_directory, 'WCS_Capabilities_%s.xml' % md5)
else:
logging.error('Service not supported.')
#print(time.time())
file_found = False
try:
mtime = os.path.getmtime(filename)
age = time.time()-mtime # seconds
file_found = True
except:
pass
if file_found == False or age > 86400:
logging.info('Fetching a new file...')
res = requests.get(url)
if not os.path.exists(working_directory):
os.makedirs(working_directory)
with codecs.open(filename, 'w', 'utf-8') as fp:
#fp.write(res.text.encode("utf-8"))
fp.write(res.text)
with codecs.open(filename, 'r', 'utf-8') as fp:
capabilities = fp.read()
return capabilities
def enrich_links( links, working_directory ):
logging.debug('Enriching links...')
enriched_links = links.copy()
for k, link in enumerate(links):
if 'service' not in link.keys():
continue
Alessandro Cerioni
committed
try:
if 'WMS' in link['service']:
capabilities = get_capabilities( link['url'], 'wms', working_directory )
info = analyze_wms_capabilities( capabilities, link['name'])
enriched_links[k] = {**info, **link}
elif 'WFS' in link['service']:
capabilities = get_capabilities( link['url'], 'wfs', working_directory )
info = analyze_wfs_capabilities( capabilities, link['name'] )
enriched_links[k] = {**info, **link}
elif 'WCS' in link['service']:
capabilities = get_capabilities( link['url'], 'wcs', working_directory )
info = analyze_wcs_capabilities( capabilities, link['name'] )
enriched_links[k] = {**info, **link}
else:
pass
except:
logging.warning('Did not manage to get and/or analyze capabilities for the following service: %s' % link['url'])
return enriched_links