from __future__ import annotations
import logging as log
from enum import Enum
from typing import Optional
import networkx as nx
from shapely.geometry import LineString
from mappymatch.constructs.geofence import Geofence
from mappymatch.utils.crs import LATLON_CRS, XY_CRS
from mappymatch.utils.exceptions import MapException
log.basicConfig(level=log.INFO)
METERS_TO_KM = 1 / 1000
DEFAULT_MPH = 30
[docs]
class NetworkType(Enum):
"""
Enumerator for Network Types supported by osmnx.
"""
ALL_PRIVATE = "all_private"
ALL = "all"
BIKE = "bike"
DRIVE = "drive"
DRIVE_SERVICE = "drive_service"
WALK = "walk"
[docs]
def nx_graph_from_osmnx(
geofence: Geofence,
network_type: NetworkType,
xy: bool = True,
custom_filter: Optional[str] = None,
) -> nx.MultiDiGraph:
"""
Build a networkx graph from OSM data
Args:
geofence: the geofence to clip the graph to
network_type: the network type to use for the graph
xy: whether to use xy coordinates or lat/lon
custom_filter: a custom filter to pass to osmnx
Returns:
a networkx graph of the OSM network
"""
try:
import osmnx as ox
except ImportError:
raise MapException(
"osmnx is not installed but is required for this map type"
)
ox.settings.log_console = False
raw_graph = ox.graph_from_polygon(
geofence.geometry,
network_type=network_type.value,
custom_filter=custom_filter,
)
return parse_osmnx_graph(raw_graph, network_type, xy=xy)
[docs]
def parse_osmnx_graph(
graph: nx.MultiDiGraph,
network_type: NetworkType,
xy: bool = True,
) -> nx.MultiDiGraph:
"""
Parse the raw osmnx graph into a graph that we can use with our NxMap
Args:
geofence: the geofence to clip the graph to
xy: whether to use xy coordinates or lat/lon
network_type: the network type to use for the graph
Returns:
a cleaned networkx graph of the OSM network
"""
try:
import osmnx as ox
except ImportError:
raise MapException(
"osmnx is not installed but is required for this map type"
)
ox.settings.log_console = False
g = graph
if xy:
g = ox.project_graph(g, XY_CRS)
crs = XY_CRS
else:
crs = LATLON_CRS
g = ox.add_edge_speeds(g)
g = ox.add_edge_travel_times(g)
length_meters = nx.get_edge_attributes(g, "length")
kilometers = {k: v * METERS_TO_KM for k, v in length_meters.items()}
nx.set_edge_attributes(g, kilometers, "kilometers")
# this makes sure there are no graph 'dead-ends'
sg_components = nx.strongly_connected_components(g)
if not sg_components:
raise MapException(
"road network has no strongly connected components and is not routable; "
"check polygon boundaries."
)
g = nx.MultiDiGraph(g.subgraph(max(sg_components, key=len)))
no_geom = 0
for u, v, d in g.edges(data=True):
if "geometry" not in d:
# we'll build a pseudo-geometry using the x, y data from the nodes
unode = g.nodes[u]
vnode = g.nodes[v]
line = LineString(
[(unode["x"], unode["y"]), (vnode["x"], vnode["y"])]
)
d["geometry"] = line
no_geom += 1
if no_geom:
print(
f"Warning: found {no_geom} links with no geometry; creating geometries from the node lat/lon"
)
g = compress(g)
g.graph["crs"] = crs
# TODO: these should all be sourced from the same location
g.graph["distance_weight"] = "kilometers"
g.graph["time_weight"] = "travel_time"
g.graph["geometry_key"] = "geometry"
g.graph["network_type"] = network_type.value
return g
[docs]
def compress(g) -> nx.MultiDiGraph:
"""
a hacky way to delete unnecessary data on the networkx graph
Args:
g: the networkx graph to compress
Returns:
the compressed networkx graph
"""
keys_to_delete = [
"oneway",
"ref",
"access",
"lanes",
"name",
"maxspeed",
"highway",
"length",
"speed_kph",
"osmid",
"street_count",
"junction",
"bridge",
"tunnel",
"y",
"x",
]
for _, _, d in g.edges(data=True):
for k in keys_to_delete:
try:
del d[k]
except KeyError:
continue
for _, d in g.nodes(data=True):
for k in keys_to_delete:
try:
del d[k]
except KeyError:
continue
return g