Source code for libcity.model.map_matching.IVMM

import networkx as nx
import math
from logging import getLogger
from libcity.model.abstract_traffic_tradition_model import AbstractTraditionModel
from libcity.utils.GPS_utils import radian2angle, R_EARTH, angle2radian, dist, init_bearing
import numpy as np


[docs]class IVMM(AbstractTraditionModel): """ IVMM """ def __init__(self, config, data_feature): super().__init__(config, data_feature) # logger self._logger = getLogger() # model param self.k = config.get('k', 5) # maximum number of candidates of any sampling points self.r = config.get('r', 100) # radius of road segment candidate self.mu = config.get('mu', 0) self.sigma = config.get('sigma', 10) self.beta = config.get('beta', 40) self.window_size = config.get('window_size', 40) # data param self.with_time = data_feature.get('with_time', True) self.with_rd_speed = data_feature.get('with_rd_speed', True) self.delta_time = data_feature.get('delta_time', True) # data cache self.rd_nwk = None self.tra_id = None self.trajectory = None self.candidates = list() self.res_dct = dict() # To facilitate the search of candidate points, the road network is indexed using a grid. self.lon_r = None self.lat_r = None
[docs] def run(self, data): # road network & trajectory self.rd_nwk = data['rd_nwk'] trajectory = data['trajectory'] # set lon_radius and lat_radius based on the first Node of rd_nwk self._set_lon_lat_radius( self.rd_nwk.nodes[list(self.rd_nwk.nodes)[0]]['lon'], self.rd_nwk.nodes[list(self.rd_nwk.nodes)[0]]['lat'] ) # deal with every trajectories for usr_id, usr_value in trajectory.items(): self.usr_id = usr_id for traj_id, value in usr_value.items(): self._logger.info('begin map matching, usr_id:%d traj_id:%d' % (usr_id, traj_id)) self.traj_id = traj_id self.trajectory = value self._run_one_tra() self._logger.info('finish map matching, usr_id:%d traj_id:%d' % (usr_id, traj_id)) return self.res_dct
[docs] def _run_one_tra(self): """ run ST-Matching for one trajectory self.trajectory self.rd_nwk Returns: """ self._get_candidates() self._logger.info('finish getting candidates') self._observation_probability() self._logger.info('finish calculating observation probability') self._transmission_probability() self._logger.info('finish calculating transmission probability') if self.with_rd_speed and self.with_time: self._temporal_analysis() self._logger.info('finish spatial analysis') self._score_matrix() self._logger.info('finish building score matrix') self._interactive_voting() self._logger.info('finish interactive voting') self._find_matched_sequence() self._logger.info('finish finding matched sequence')
[docs] def _set_lon_lat_radius(self, lon, lat): """ get longitude range & latitude range (because radius is actually achieved by a grid search) Args: lon: longitude local lat: latitude local self.r Returns: self.lon_r self.lat_r """ # lat_r self.lat_r = radian2angle(self.r / R_EARTH) # lon_r r_prime = R_EARTH * math.cos(angle2radian(lat)) self.lon_r = radian2angle(self.r / r_prime)
def _point_edge_dist(self, lon, lat, edge): lat_origin = angle2radian(self.rd_nwk.nodes[edge[0]]['lat']) lon_origin = angle2radian(self.rd_nwk.nodes[edge[0]]['lon']) lat_dest = angle2radian(self.rd_nwk.nodes[edge[1]]['lat']) lon_dest = angle2radian(self.rd_nwk.nodes[edge[1]]['lon']) a = dist(angle2radian(lat), angle2radian(lon), lat_origin, lon_origin) b = dist(angle2radian(lat), angle2radian(lon), lat_dest, lon_dest) c = dist(lat_origin, lon_origin, lat_dest, lon_dest) # if origin point is the closest if b ** 2 > a ** 2 + c ** 2: return a, edge[0] # distance, point # if destination point is the closest elif a ** 2 > b ** 2 + c ** 2: return b, edge[1] if c == 0: return a, edge[0] # otherwise, calculate the Vertical length p = (a + b + c) / 2 s = math.sqrt(p * math.fabs(p - a) * math.fabs(p - b) * math.fabs(p - c)) return 2 * s / c, None
[docs] def _get_candidates(self): """ get candidates of each GPS sample with given road network Returns: self.candidates: a list of list. In each list are tuples (edge, distance, node) """ # get trajectory without time traj_lon_lat = self.trajectory[:, 1:3] assert traj_lon_lat.shape[1] == 2 # for every GPS sample for i in range(traj_lon_lat.shape[0]): candidate_i = set() lon, lat = traj_lon_lat[i, :] # for every edge for j in self.rd_nwk.edges: origin, dest = j[:2] lat_origin = self.rd_nwk.nodes[origin]['lat'] lon_origin = self.rd_nwk.nodes[origin]['lon'] lat_dest = self.rd_nwk.nodes[dest]['lat'] lon_dest = self.rd_nwk.nodes[dest]['lon'] if lat - self.lat_r <= lat_origin <= lat + self.lat_r \ and lon - self.lon_r <= lon_origin <= lon + self.lon_r \ or lat - self.lat_r <= lat_dest <= lat + self.lat_r \ and lon - self.lon_r <= lon_dest <= lon + self.lon_r: candidate_i.add((origin, dest)) elif lat - self.lat_r <= lat_origin / 2 + lat_dest / 2 <= lat + self.lat_r \ and lon - self.lon_r <= lon_origin / 2 + lon_dest / 2 <= lon + self.lon_r: candidate_i.add((origin, dest)) candidate_i_m = list() # (edge, distance, point) for edge in candidate_i: distance, node = self._point_edge_dist(lon, lat, edge) candidate_i_m.append((edge, distance, node)) candidate_i_m.sort(key=lambda a: a[1]) # asc candidate_i_m = candidate_i_m[:min(self.k, len(candidate_i_m))] candidate_i_k = dict() for edge, distance, node in candidate_i_m: candidate_i_k[edge] = {'distance': distance, 'node': node, "voted": 0, "pre_set": []} self.candidates.append(candidate_i_k)
[docs] def _observation_probability(self): """ Returns: """ # for candidates of every node for candidate_i in self.candidates: for edge, dct in candidate_i.items(): candidate_i[edge]['N'] = (1 / math.sqrt(2 * math.pi) / self.sigma * math.exp( - (dct['distance'] - self.mu) ** 2 / (2 * self.sigma ** 2)))
[docs] def _transmission_probability(self): """ Returns: """ i = 1 while i < len(self.candidates): # j and k j = i - 1 if len(self.candidates[i]) == 0: k = i + 1 while len(self.candidates[k]) == 0 and k < len(self.candidates): k += 1 if k == len(self.candidates): break else: k = i d = dist( angle2radian(self.trajectory[j][2]), angle2radian(self.trajectory[j][1]), angle2radian(self.trajectory[k][2]), angle2radian(self.trajectory[k][1]) ) for edge_j, dct_j in self.candidates[j].items(): for edge_k, dct_k in self.candidates[k].items(): brng_jk = init_bearing( angle2radian(self.trajectory[j][2]), angle2radian(self.trajectory[j][1]), angle2radian(self.trajectory[k][2]), angle2radian(self.trajectory[k][1]) ) brng_edge_j = init_bearing( angle2radian(self.rd_nwk.nodes[edge_j[0]]['lat']), angle2radian(self.rd_nwk.nodes[edge_j[0]]['lon']), angle2radian(self.rd_nwk.nodes[edge_j[1]]['lat']), angle2radian(self.rd_nwk.nodes[edge_j[1]]['lon']), ) try: if dct_j['node'] is not None and dct_k['node'] is not None: result = d / nx.astar_path_length(self.rd_nwk, dct_j['node'], dct_k['node'], weight='distance') elif dct_j['node'] is not None: nd2_origin = edge_k[0] lon, lat = self.rd_nwk.nodes[nd2_origin]['lon'], self.rd_nwk.nodes[nd2_origin]['lat'] path_len = nx.astar_path_length(self.rd_nwk, dct_j['node'], nd2_origin, weight='distance') path_len += math.sqrt( math.fabs( dist( angle2radian(self.trajectory[k][2]), angle2radian(self.trajectory[k][1]), angle2radian(lat), angle2radian(lon) ) ** 2 - dct_k['distance'] ** 2 ) ) if edge_j[1] == dct_j['edge']: path_len += self.rd_nwk[edge_j[0]][edge_j[1]]['distance'] * 2 result = d / path_len elif dct_k['node'] is not None: nd1_destination = edge_j[1] lon, lat = self.rd_nwk.nodes[nd1_destination]['lon'], self.rd_nwk.nodes[nd1_destination][ 'lat'] path_len = nx.astar_path_length(self.rd_nwk, nd1_destination, dct_k['node'], weight='distance') path_len += math.sqrt( math.fabs( dist( angle2radian(self.trajectory[j][2]), angle2radian(self.trajectory[j][1]), angle2radian(lat), angle2radian(lon) ) ** 2 - dct_j['distance'] ** 2 ) ) if edge_k[1] == dct_k['node']: path_len += self.rd_nwk[edge_k[0]][edge_k[1]]['distance'] * 2 result = d / path_len else: if edge_j == edge_k and math.fabs(brng_edge_j - brng_jk) < 90: result = 1 else: nd1_destination = edge_j[1] lon1, lat1 = self.rd_nwk.nodes[nd1_destination]['lon'], \ self.rd_nwk.nodes[nd1_destination]['lat'] nd2_origin = edge_k[0] lon2, lat2 = self.rd_nwk.nodes[nd2_origin]['lon'], self.rd_nwk.nodes[nd2_origin]['lat'] result = d / ( nx.astar_path_length(self.rd_nwk, nd1_destination, nd2_origin, weight='distance') + math.sqrt( math.fabs( dist( angle2radian(self.trajectory[j][2]), angle2radian(self.trajectory[j][1]), angle2radian(lat1), angle2radian(lon1) ) ** 2 - dct_j['distance'] ** 2 ) ) + math.sqrt( math.fabs( dist( angle2radian(self.trajectory[k][2]), angle2radian(self.trajectory[k][1]), angle2radian(lat2), angle2radian(lon2) ) ** 2 - dct_k['distance'] ** 2 ) ) ) if 'V' in dct_j.keys(): dct_j['V'][edge_k] = min(result, 1) else: dct_j['V'] = {edge_k: min(result, 1)} except: if 'V' in dct_j.keys(): dct_j['V'][edge_k] = 0 else: dct_j['V'] = {edge_k: 0} i += 1
[docs] def _temporal_analysis(self): """ Returns: """
# TODO def _score_matrix(self): sub_static_matrices = list() # for every GPS sample for i in range(len(self.candidates)): # current candidates: self.candidates[i] # prev candidates: self.candidates[j] # no current candidates if i == 0: continue if len(self.candidates[i]) == 0: sub_static_matrix = np.zeros((1, 1)) sub_static_matrices.append(sub_static_matrix) continue # if there are current candidates, find prev index j j = i - 1 while j >= 0 and len(self.candidates[j]) == 0: j -= 1 # j >= 0, calculate score of candidates of GPS sample i if j < 0: sub_static_matrix = np.zeros((1, 1)) sub_static_matrices.append(sub_static_matrix) continue sub_static_matrix = np.zeros((len(self.candidates[j].items()), len(self.candidates[i].items()))) n = 0 for edge, dct in self.candidates[i].items(): m = 0 for edge_pre, dct_pre in self.candidates[j].items(): sub_static_matrix[m][n] = dct['N'] * dct_pre['V'][edge] * ( 1 if 'T' not in dct_pre.keys() else dct_pre['T'][edge]) m += 1 n += 1 sub_static_matrices.append(sub_static_matrix) traj_lon_lat = self.trajectory[:, 1:3] self.traj_points = [] for i in range(len(traj_lon_lat)): W = [] j = 0 while j < len(traj_lon_lat): if i == j: j += 1 continue lon_i, lat_i = traj_lon_lat[i, :] lon_j, lat_j = traj_lon_lat[j, :] euclid_distance = dist(angle2radian(lat_i), angle2radian(lon_i), angle2radian(lat_j), angle2radian(lon_j)) W.append(pow(math.e, -(euclid_distance ** 2 / self.beta ** 2))) j += 1 fai = [] self.sub_static_matrices = sub_static_matrices self.W = W for k in range(len(W)): fai.append(sub_static_matrices[k] * W[k]) traj_point = {"time": self.trajectory[i][0], "lon": self.trajectory[i][1], "lat": self.trajectory[i][2], "fai": fai, "candidates": self.candidates[i]} self.traj_points.append(traj_point) # Φ(fai) is the weighted score matrix def _interactive_voting(self): for traj_point in self.traj_points: fai = traj_point["fai"] for candidate in traj_point["candidates"].values(): self._find_sequence(fai, candidate) def _find_sequence(self, fai, candidate): for i in range(len(self.candidates)): if candidate not in self.candidates[i].values(): i += 1 else: break current_p = i j = 0 for c in self.candidates[i].values(): if candidate == c: current_c = j break else: j += 1 # get the location of current traj_point i += 1 if i == len(self.candidates): # traj_point is in the last column last_point = None last_score = 0 while i < len(self.candidates): if len(self.candidates[i]) == 0: i += 1 if i == len(self.candidates): last_point = None last_score = 0 continue # ensure i has candidates j = i - 1 while j > current_p and len(self.candidates[j]) == 0: j -= 1 # ensure j has candidates k = 0 for edge, dct in self.candidates[i].items(): max_score = -float("inf") if j == current_p: tmp = fai[i - 1][current_c][k] if tmp > max_score: dct["score"] = tmp else: m = 0 for edge_pre, dct_pre in self.candidates[j].items(): tmp = dct_pre['score'] + fai[i - 1][m][k] if tmp > max_score: max_score = tmp pre = dct_pre m += 1 dct["score"] = max_score for c in pre["pre_set"]: dct["pre_set"].append(c) dct["pre_set"].append(pre) k += 1 i += 1 if i == len(self.candidates): last_point = dct last_score = dct["score"] i = current_p - 1 if i == -1: first_point = None first_score = 0 while i >= 0: if len(self.candidates[i]) == 0: i -= 1 if i == -1: first_point = dct first_score = dct["score"] continue j = i + 1 while j < current_p and len(self.candidates[j]) == 0: j += 1 k = 0 for edge, dct in self.candidates[i].items(): max_score = -float("inf") if j == current_p: tmp = fai[j - 1][k][current_c] if tmp > max_score: dct["score"] = tmp else: m = 0 for edge_pre, dct_pre in self.candidates[j].items(): tmp = dct_pre['score'] + fai[j - 1][k][m] if tmp > max_score: max_score = tmp pre = dct_pre m += 1 dct["score"] = max_score for c in pre["pre_set"]: dct["pre_set"].append(c) dct["pre_set"].append(pre) k += 1 i -= 1 if i == -1: first_point = dct first_score = dct["score"] candidate["fValue"] = first_score + last_score if first_point is not None: for point in first_point["pre_set"]: point["voted"] += 1 if last_point is not None: for point in last_point["pre_set"]: point["voted"] += 1 candidate["voted"] += 1 for traj_point in self.traj_points: for c in traj_point["candidates"].values(): c["pre_set"] = [] def _find_matched_sequence(self): res = [] for traj_point in self.traj_points: if len(traj_point["candidates"]) != 0: max_voted = 0 fValue = 0 for key in traj_point["candidates"].keys(): candidate = traj_point["candidates"][key] tmp = candidate["voted"] if tmp > max_voted: max_voted = tmp fValue = candidate["fValue"] max_voted_key = key elif tmp == max_voted: if candidate["fValue"] > fValue: max_voted = tmp fValue = candidate["fValue"] max_voted_key = key else: max_voted_key = None res.append(max_voted_key) # to geo_id res_lst_rel = np.array(list(map(lambda x: self.rd_nwk.edges[x]['geo_id'] if x is not None else None, res))) dyna_id_lst = self.trajectory[:, 0].astype(int) if self.with_time: time_lst = self.trajectory[:, 3] res_all = np.stack([dyna_id_lst, res_lst_rel, time_lst], axis=1) else: res_all = np.stack([dyna_id_lst, res_lst_rel], axis=1) # set self.res_dct if self.usr_id in self.res_dct.keys(): self.res_dct[self.usr_id][self.traj_id] = res_all else: self.res_dct[self.usr_id] = {self.traj_id: res_all}