Source code for mappymatch.maps.nx.readers.osm_readers

from __future__ import annotations

import logging as log
from enum import Enum

import networkx as nx
from shapely.geometry import LineString

from mappymatch.constructs.geofence import Geofence
from mappymatch.utils.crs import LATLON_CRS, XY_CRS
from mappymatch.utils.exceptions import MapException

log.basicConfig(level=log.INFO)


METERS_TO_KM = 1 / 1000
DEFAULT_MPH = 30


[docs]class NetworkType(Enum): """ Enumerator for Network Types supported by osmnx. """ ALL_PRIVATE = "all_private" ALL = "all" BIKE = "bike" DRIVE = "drive" DRIVE_SERVICE = "drive_service" WALK = "walk"
[docs]def nx_graph_from_osmnx( geofence: Geofence, network_type: NetworkType, xy: bool = True ) -> nx.MultiDiGraph: """ Build a networkx graph from OSM data Args: geofence: the geofence to clip the graph to network_type: the network type to use for the graph xy: whether to use xy coordinates or lat/lon Returns: a networkx graph of the OSM network """ try: import osmnx as ox except ImportError: raise MapException( "osmnx is not installed but is required for this map type" ) ox.settings.log_console = False raw_graph = ox.graph_from_polygon( geofence.geometry, network_type=network_type.value ) return parse_osmnx_graph(raw_graph, network_type)
[docs]def parse_osmnx_graph( graph: nx.MultiDiGraph, network_type: NetworkType, xy: bool = True, ) -> nx.MultiDiGraph: """ Parse the raw osmnx graph into a graph that we can use with our NxMap Args: geofence: the geofence to clip the graph to xy: whether to use xy coordinates or lat/lon network_type: the network type to use for the graph Returns: a cleaned networkx graph of the OSM network """ try: import osmnx as ox except ImportError: raise MapException( "osmnx is not installed but is required for this map type" ) ox.settings.log_console = False g = graph if xy: g = ox.project_graph(g, XY_CRS) crs = XY_CRS else: crs = LATLON_CRS g = ox.add_edge_speeds(g) g = ox.add_edge_travel_times(g) length_meters = nx.get_edge_attributes(g, "length") kilometers = {k: v * METERS_TO_KM for k, v in length_meters.items()} nx.set_edge_attributes(g, kilometers, "kilometers") # this makes sure there are no graph 'dead-ends' sg_components = nx.strongly_connected_components(g) if not sg_components: raise MapException( "road network has no strongly connected components and is not routable; " "check polygon boundaries." ) g = nx.MultiDiGraph(g.subgraph(max(sg_components, key=len))) no_geom = 0 for u, v, d in g.edges(data=True): if "geometry" not in d: # we'll build a pseudo-geometry using the x, y data from the nodes unode = g.nodes[u] vnode = g.nodes[v] line = LineString( [(unode["x"], unode["y"]), (vnode["x"], vnode["y"])] ) d["geometry"] = line no_geom += 1 if no_geom: print( f"Warning: found {no_geom} links with no geometry; creating geometries from the node lat/lon" ) g = compress(g) g.graph["crs"] = crs # TODO: these should all be sourced from the same location g.graph["distance_weight"] = "kilometers" g.graph["time_weight"] = "travel_time" g.graph["geometry_key"] = "geometry" g.graph["network_type"] = network_type.value return g
[docs]def compress(g) -> nx.MultiDiGraph: """ a hacky way to delete unnecessary data on the networkx graph Args: g: the networkx graph to compress Returns: the compressed networkx graph """ keys_to_delete = [ "oneway", "ref", "access", "lanes", "name", "maxspeed", "highway", "length", "speed_kph", "osmid", "street_count", "junction", "bridge", "tunnel", "y", "x", ] for _, _, d in g.edges(data=True): for k in keys_to_delete: try: del d[k] except KeyError: continue for _, d in g.nodes(data=True): for k in keys_to_delete: try: del d[k] except KeyError: continue return g