Skip to content
Snippets Groups Projects
enrich_links.py 6.51 KiB
Newer Older
  • Learn to ignore specific revisions
  • import xml.etree.ElementTree as ET
    import os.path, time
    import codecs
    import hashlib
    import requests
    
    from .my_logging import logging
    # ------------------------------------------------------------------------------
    
    def str_to_float( the_string ):
    
        return float( the_string )
    
    def amend_epsg_label( epsg_label ):
    
        return "EPSG:%s" % epsg_label.split(':')[-1]
    
    def analyze_wms_capabilities( capabilities, the_layer ):
    
        output = dict()
        root = ET.fromstring(capabilities)
    
        for layer in root.findall('.//{http://www.opengis.net/wms}Layer'):
    
    
            name = layer.find('.//{http://www.opengis.net/wms}Name')
    
            #print(name.text)
    
            # do nothing if the layer name doesn't match with the one we are looking for
            if name.text != the_layer:
                continue
    
            output['bbox_by_projection'] = dict()
            output['projections'] = list()
    
    
            for bbox in layer.findall('.//{http://www.opengis.net/wms}BoundingBox'):
    
                #print(bbox.attrib)
    
                output['bbox_by_projection'][bbox.attrib['CRS']] = dict()
                output['projections'].append(bbox.attrib['CRS'])
    
                for key, value in bbox.attrib.items():
    
    
                        #print(key)
                        output['bbox_by_projection'][bbox.attrib['CRS']][key] = str_to_float( value )
    
        #print(output)
        #exit(1)
    
        return output
    
    def analyze_wfs_capabilities( capabilities, the_layer ):
    
        output = dict()
        root = ET.fromstring(capabilities)
    
        for layer in root.findall('.//{http://www.opengis.net/wfs/2.0}FeatureType'):
    
    
            layer_name = layer.find('.//{http://www.opengis.net/wfs/2.0}Name')
    
    
            # do nothing if the layer name doesn't match with the one we are looking for
    
            if layer_name.text != "ms:" + the_layer and layer_name.text != "public:" + the_layer:
    
            default_crs = layer.find('.//{http://www.opengis.net/wfs/2.0}DefaultCRS')
            other_crss  = layer.findall('.//{http://www.opengis.net/wfs/2.0}OtherCRS')
    
    
            output['projections'] = [ amend_epsg_label(default_crs.text) ]
    
            for other_crs in other_crss:
                output['projections'].append( amend_epsg_label(other_crs.text) )
    
    
        return output
    
    def analyze_wcs_capabilities( capabilities, the_layer ):
    
        output = dict()
        root = ET.fromstring( capabilities )
    
        for layer in root.findall('.//{http://www.opengis.net/wcs/1.1}CoverageDescription'):
    
            name = layer.find('{http://www.opengis.net/wcs/1.1}Identifier')
    
            # do nothing if the layer name doesn't match with the one we are looking for
            if name.text != the_layer:
                continue
    
            output['bbox_by_projection'] = dict()
            output['projections'] = list()
    
            for bbox in layer.findall('.//{http://www.opengis.net/ows/1.1}BoundingBox'):
                #print(bbox.attrib)
    
                if 'EPSG' in bbox.attrib['crs']:
                    #print( amend_epsg_label(bbox.attrib['crs']) )
    
                    crs = amend_epsg_label(bbox.attrib['crs'])
    
                    output['projections'].append(crs)
    
                    lower_corner = bbox.find('./{http://www.opengis.net/ows/1.1}LowerCorner')
                    #print(lower_corner.text)
    
                    minx, miny = lower_corner.text.split(' ')
                    minx = str_to_float(minx)
                    miny = str_to_float(miny)
    
                    upper_corner = bbox.find('./{http://www.opengis.net/ows/1.1}UpperCorner')
                    #print(upper_corner.text)
    
                    maxx, maxy = upper_corner.text.split(' ')
                    maxx = str_to_float(maxx)
                    maxy = str_to_float(maxy)
    
                    output['bbox_by_projection'][crs] = {'minx': minx, 'miny': miny, 'maxx': maxx, 'maxy': maxy}
    
        return output
    
    def get_capabilities( root_url, service, working_directory ):
    
    
        if root_url.endswith('?'):
            local_root_url = root_url
        else:
            local_root_url = root_url + '?'
    
    
            url = local_root_url + 'version=1.3.0&service=WMS&request=GetCapabilities'
    
            md5 = hashlib.md5( root_url.encode("utf-8") ).hexdigest()
            filename = os.path.join( working_directory, 'WMS_Capabilities_%s.xml' % md5)
    
        elif service == 'wfs':
    
            url = local_root_url + 'request=GetCapabilities&service=WFS'
    
            md5 = hashlib.md5( root_url.encode("utf-8") ).hexdigest()
            filename = os.path.join( working_directory, 'WFS_Capabilities_%s.xml' % md5)
    
        elif service == 'wcs':
    
            url = local_root_url + 'version=1.1.0&service=WCS&request=DescribeCoverage'
    
            md5 = hashlib.md5( root_url.encode("utf-8") ).hexdigest()
            filename = os.path.join( working_directory, 'WCS_Capabilities_%s.xml' % md5)
    
        else:
            logging.error('Service not supported.')
    
    
        #print(time.time())
        file_found = False
        try:
            mtime = os.path.getmtime(filename)
            age = time.time()-mtime # seconds
            file_found = True
        except:
            pass
    
        if file_found == False or age > 86400:
            logging.info('Fetching a new file...')
            res = requests.get(url)
    
            if not os.path.exists(working_directory):
                os.makedirs(working_directory)
    
            with codecs.open(filename, 'w', 'utf-8') as fp:
                #fp.write(res.text.encode("utf-8"))
                fp.write(res.text)
    
        with codecs.open(filename, 'r', 'utf-8') as fp:
            capabilities = fp.read()
    
        return capabilities
    
    def enrich_links( links, working_directory ):
    
        logging.debug('Enriching links...')
    
        enriched_links = links.copy()
    
        for k, link in enumerate(links):
            if 'service' not in link.keys():
                continue
    
            try:
                if 'WMS' in link['service']:
                    capabilities = get_capabilities( link['url'], 'wms', working_directory )
                    info = analyze_wms_capabilities( capabilities, link['name'])
                    enriched_links[k] = {**info, **link}
                elif 'WFS' in link['service']:
                    capabilities = get_capabilities( link['url'], 'wfs', working_directory )
                    info = analyze_wfs_capabilities( capabilities, link['name'] )
                    enriched_links[k] = {**info, **link}
                elif 'WCS' in link['service']:
                    capabilities = get_capabilities( link['url'], 'wcs', working_directory )
                    info = analyze_wcs_capabilities( capabilities, link['name'] )
                    enriched_links[k] = {**info, **link}
                else:
                    pass
            except:
                logging.warning('Did not manage to get and/or analyze capabilities for the following service: %s' % link['url'])