Source code for libcity.data.dataset.dataset_subclass.gsnet_dataset

import datetime

import numpy as np
import pandas as pd


from libcity.data.dataset import TrafficStateCPTDataset


[docs]class GSNetDataset(TrafficStateCPTDataset): def __init__(self, config): # initialize here for calling self._load_rel() properly self.weight_col_list = config.get('weight_col', []) super(GSNetDataset, self).__init__(config) # for properly loading the dyna file self.len_row = config.get('grid_len_row', None) self.len_column = config.get('grid_len_column', None) # NOTE: DOES NOT take into account time-of-day and day-of-week rows self.num_of_target_time_feature = self.config.get('num_of_target_time_feature', 0) self.grid_in_channel = len(self.config.get('data_col', [])) if self.add_time_in_day: self.num_of_target_time_feature += 24 self.grid_in_channel += 24 if self.add_day_in_week: self.num_of_target_time_feature += 7 self.grid_in_channel += 7 self.data_col_risk_mask = self.config.get('data_col_risk_mask', 'risk_mask') self.data_col_grid_node_map = self.config.get('data_col_grid_node_map', 'grid_node_map') self._load_risk_mask(self.dataset) def _load_rel(self): try: orig_weight_col = self.weight_col except AttributeError: orig_weight_col = None try: orig_adj_mx = self.adj_mx except AttributeError: orig_adj_mx = None try: orig_distance_df = self.distance_df except AttributeError: orig_distance_df = None # code reuse try: self.weight_col = self.weight_col_list[0] super(GSNetDataset, self)._load_rel() self.road_adj = self.adj_mx self.weight_col = self.weight_col_list[1] super(GSNetDataset, self)._load_rel() self.risk_adj = self.adj_mx if len(self.weight_col_list) > 2: self.weight_col = self.weight_col_list[2] super(GSNetDataset, self)._load_rel() self.poi_adj = self.adj_mx finally: self.weight_col = orig_weight_col self.adj_mx = orig_adj_mx self.distance_df = orig_distance_df def _load_dyna(self, filename): # dynamic data must be 4D in this model # fake grid-based geoids orig_geo_ids = self.geo_ids self.geo_ids = [i * self.len_column + j for i in range(self.len_row) for j in range(self.len_column)] result = super(GSNetDataset, self)._load_grid_4d(filename) self.geo_ids = orig_geo_ids return result # for grid-based auxillary matrices def _load_risk_mask(self, filename): self._logger.info("Loading file " + filename + ".geo") df = pd.read_csv(self.data_path + filename + '.geo') # column first, reflecting the model's preference len_row, len_column = self.len_row, self.len_column num_graph_nodes = len(df) risk_mask_values = [] k = 0 for i in range(len_row): for j in range(len_column): if k < num_graph_nodes and i == df['row_id'][k] and j == df['column_id'][k]: risk_mask_values.append(df['risk_mask'][k]) k += 1 else: risk_mask_values.append(0.0) grid_node_map_values = [] for i in range(len_row * len_column): grid_node_map_values.append([0.0] * num_graph_nodes) for i in range(num_graph_nodes): index = df['row_id'][i] * len_column + df['column_id'][i] grid_node_map_values[index][df['geo_id'][i]] = 1.0 self.risk_mask = np.array(risk_mask_values, dtype=np.float32).reshape(len_column, len_row) self.grid_node_map = np.array(grid_node_map_values, dtype=np.float32).reshape(len_column * len_row, num_graph_nodes) def _get_external_array(self, ts, ext_data=None, previous_ext=False): # one-hot encoding that differs from ordinary datasets ts_count = len(ts) data_list = [] if self.add_time_in_day: time_indices = ((ts - ts.astype("datetime64[D]")) / np.timedelta64(1, "h")).astype("int") curr = np.zeros((ts_count, 24)) # [ts_count, 24] curr[np.arange(0, ts_count), time_indices] = 1 data_list.append(curr) if self.add_day_in_week: week_indices = [] for day in ts.astype("datetime64[D]"): week_indices.append(datetime.datetime.strptime(str(day), '%Y-%m-%d').weekday()) curr = np.zeros((ts_count, 7)) curr[np.arange(0, ts_count), week_indices] = 1 data_list.append(curr) if ext_data is not None: indexs = [] for ts_ in ts: if previous_ext: ts_index = self.idx_of_ext_timesolts[ts_ - self.offset_frame] else: ts_index = self.idx_of_ext_timesolts[ts_] indexs.append(ts_index) select_data = ext_data[indexs] data_list.append(select_data) if len(data_list) > 0: data = np.concatenate(data_list, axis=1) else: data = np.zeros((len(ts), 0)) return data
[docs] def get_data_feature(self): d = { "scaler": self.scaler, "num_batches": self.num_batches, "feature_dim": self.feature_dim, "ext_dim": self.ext_dim, "output_dim": self.output_dim, "len_row": self.len_row, "len_column": self.len_column } d['risk_mask'] = self.risk_mask d['road_adj'] = self.road_adj d['risk_adj'] = self.risk_adj if hasattr(self, 'poi_adj'): d['poi_adj'] = self.poi_adj else: d['poi_adj'] = None d['grid_node_map'] = self.grid_node_map d['num_of_target_time_feature'] = self.num_of_target_time_feature lp = self.len_period * (self.pad_forward_period + self.pad_back_period + 1) lt = self.len_trend * (self.pad_forward_trend + self.pad_back_trend + 1) d['len_closeness'] = self.len_closeness d['len_period'] = lp d['len_trend'] = lt d['add_time_in_day'] = self.add_time_in_day d['add_day_in_week'] = self.add_day_in_week # what rows should be considered in transformation from grid input to the graph one, referred by indices data_col = self.config.get('data_col', []) for k in ['graph_input', 'target_time']: d[f'{k}_indices'] = [] for n in self.config.get(f'{k}_col', []): # let ValueErrors raise d[f'{k}_indices'].append(data_col.index(n)) d['risk_thresholds'] = self.config.get('risk_thresholds', []) d['risk_weights'] = self.config.get('risk_weights', []) for k in ['risk_thresholds', 'risk_weights']: d[k] = self.config.get(k, []) if d[k] != sorted(d[k]): raise ValueError(f'Dataset config item {k} is not a sorted list') if len(d['risk_thresholds']) != len(d['risk_weights']) - 1: raise ValueError('Mask loss risk thresholds must be one element shorter than risk weights') return d