Source code for mappymatch.matchers.osrm

from __future__ import annotations

import logging

import requests

from mappymatch.constructs.match import Match
from mappymatch.constructs.road import Road, RoadId
from mappymatch.constructs.trace import Trace
from mappymatch.matchers.matcher_interface import MatcherInterface, MatchResult
from mappymatch.utils.crs import LATLON_CRS
from mappymatch.utils.url import multiurljoin

log = logging.getLogger(__name__)

DEFAULT_OSRM_ADDRESS = "http://router.project-osrm.org"


def parse_osrm_json(j: dict, trace: Trace) -> list[Match]:
    """
    parse the json response from the osrm match service

    we're looking for the osm node ids which should be in the form:

    { 'matchings':
        [{ 'legs':
            [{ 'annotations':
                {'nodes': [node_id, node_id] }
            }]
        }]
    }

    :param j: the json object
    :return:
    """
    matchings = j.get("matchings")
    if not matchings:
        raise ValueError("could not find any link matchings in response")

    legs = matchings[0].get("legs")
    if not legs:
        raise ValueError("could not find any link legs in response")

    def _parse_leg(d: dict, i: int) -> Match:
        annotation = d.get("annotation")
        if not annotation:
            raise ValueError("leg has no annotation information")
        nodes = annotation.get("nodes")
        if not nodes:
            raise ValueError("leg has no osm node information")
        origin_junction_id = f"{nodes[0]}"
        destination_junction_id = f"{nodes[0]}"

        # TODO: we need to get geometry, distance info from OSRM if available
        road_id = RoadId(origin_junction_id, destination_junction_id, 0)
        road = Road(
            road_id=road_id,
            geom=None,
        )
        match = Match(
            road=road, coordinate=trace.coords[i], distance=float("infinity")
        )
        return match

    return [_parse_leg(d, i) for i, d in enumerate(legs)]


[docs]class OsrmMatcher(MatcherInterface): """ pings an OSRM server for map matching """ def __init__( self, osrm_address=DEFAULT_OSRM_ADDRESS, osrm_profile="driving", osrm_version="v1", ): self.osrm_api_base = multiurljoin( [osrm_address, "match", osrm_version, osrm_profile] )
[docs] def match_trace(self, trace: Trace) -> MatchResult: if not trace.crs == LATLON_CRS: raise TypeError( f"this matcher requires traces to be in the CRS of EPSG:{LATLON_CRS.to_epsg()} " f"but found EPSG:{trace.crs.to_epsg()}" ) if len(trace.coords) > 100: trace = trace.downsample(100) coordinate_str = "" for coord in trace.coords: coordinate_str += f"{coord.x},{coord.y};" # remove the trailing semicolon coordinate_str = coordinate_str[:-1] osrm_request = ( self.osrm_api_base + coordinate_str + "?annotations=true" ) print(osrm_request) r = requests.get(osrm_request) if not r.status_code == requests.codes.ok: r.raise_for_status() result = parse_osrm_json(r.json(), trace) return MatchResult(result)
[docs] def match_trace_batch(self, trace_batch: list[Trace]) -> list[MatchResult]: return [self.match_trace(t) for t in trace_batch]