Source code for libcity.data.dataset.dataset_subclass.chebconv_dataset

import os
import pandas as pd
import numpy as np
import scipy.sparse as sp
from logging import getLogger

from libcity.utils import StandardScaler, NormalScaler, NoneScaler, \
    MinMax01Scaler, MinMax11Scaler, LogScaler, ensure_dir
from libcity.data.dataset import AbstractDataset


[docs]class ChebConvDataset(AbstractDataset): def __init__(self, config): self.config = config self.dataset = self.config.get('dataset', '') self.cache_dataset = self.config.get('cache_dataset', True) self.train_rate = self.config.get('train_rate', 0.7) self.eval_rate = self.config.get('eval_rate', 0.1) self.scaler_type = self.config.get('scaler', 'none') # 路径等参数 self.parameters_str = \ str(self.dataset) + '_' + str(self.train_rate) + '_' \ + str(self.eval_rate) + '_' + str(self.scaler_type) self.cache_file_name = os.path.join('./libcity/cache/dataset_cache/', 'road_rep_{}.npz'.format(self.parameters_str)) self.cache_file_folder = './libcity/cache/dataset_cache/' ensure_dir(self.cache_file_folder) self.data_path = './raw_data/' + self.dataset + '/' if not os.path.exists(self.data_path): raise ValueError("Dataset {} not exist! Please ensure the path " "'./raw_data/{}/' exist!".format(self.dataset, self.dataset)) # 加载数据集的config.json文件 self.geo_file = self.config.get('geo_file', self.dataset) self.rel_file = self.config.get('rel_file', self.dataset) # 初始化 self.adj_mx = None self.scaler = None self.feature_dim = 0 self.num_nodes = 0 self._logger = getLogger() self._load_geo() self._load_rel()
[docs] def _load_geo(self): """ 加载.geo文件,格式[geo_id, type, coordinates, properties(若干列)] """ geofile = pd.read_csv(self.data_path + self.geo_file + '.geo') self.geo_ids = list(geofile['geo_id']) self.num_nodes = len(self.geo_ids) self.geo_to_ind = {} for index, idx in enumerate(self.geo_ids): self.geo_to_ind[idx] = index self._logger.info("Loaded file " + self.geo_file + '.geo' + ', num_nodes=' + str(len(self.geo_ids))) self.road_info = geofile
[docs] def _load_rel(self): """ 加载.rel文件,格式[rel_id, type, origin_id, destination_id, properties(若干列)], 生成N*N的矩阵,默认.rel存在的边表示为1,不存在的边表示为0 Returns: np.ndarray: self.adj_mx, N*N的邻接矩阵 """ map_info = pd.read_csv(self.data_path + self.rel_file + '.rel') # 使用稀疏矩阵构建邻接矩阵 adj_row = [] adj_col = [] adj_data = [] adj_set = set() cnt = 0 for i in range(map_info.shape[0]): if map_info['origin_id'][i] in self.geo_to_ind and map_info['destination_id'][i] in self.geo_to_ind: f_id = self.geo_to_ind[map_info['origin_id'][i]] t_id = self.geo_to_ind[map_info['destination_id'][i]] if (f_id, t_id) not in adj_set: adj_set.add((f_id, t_id)) adj_row.append(f_id) adj_col.append(t_id) adj_data.append(1.0) cnt = cnt + 1 self.adj_mx = sp.coo_matrix((adj_data, (adj_row, adj_col)), shape=(self.num_nodes, self.num_nodes)) save_path = self.cache_file_folder + "{}_adj_mx.npz".format(self.dataset) sp.save_npz(save_path, self.adj_mx) self._logger.info('Total link between geo = {}'.format(cnt)) self._logger.info('Adj_mx is saved at {}'.format(save_path))
def _split_train_val_test(self): # TODO: 这里进行规范化,相关内容抽象成函数,通过外部设置参数确定对哪些列进行数据预处理,即可统一 # node_features = self.road_info[['highway', 'length', 'lanes', 'tunnel', 'bridge', # 'maxspeed', 'width', 'service', 'junction', 'key']].values # 'tunnel', 'bridge', 'service', 'junction', 'key'是01 1+1+1+1+1 # 'lanes', 'highway'是类别 47+6 # 'length', 'maxspeed', 'width'是浮点 1+1+1 共61 node_features = self.road_info[self.road_info.columns[3:]] # 对部分列进行归一化 norm_dict = { 'length': 1, 'maxspeed': 5, 'width': 6 } for k, v in norm_dict.items(): d = node_features[k] min_ = d.min() max_ = d.max() dnew = (d - min_) / (max_ - min_) node_features = node_features.drop(k, 1) node_features.insert(v, k, dnew) # 对部分列进行独热编码 onehot_list = ['lanes', 'highway'] for col in onehot_list: dum_col = pd.get_dummies(node_features[col], col) node_features = node_features.drop(col, axis=1) node_features = pd.concat([node_features, dum_col], axis=1) node_features = node_features.values np.save(self.cache_file_folder + '{}_node_features.npy'.format(self.dataset), node_features) # mask 索引 sindex = list(range(self.num_nodes)) np.random.seed(1234) np.random.shuffle(sindex) test_rate = 1 - self.train_rate - self.eval_rate num_test = round(self.num_nodes * test_rate) num_train = round(self.num_nodes * self.train_rate) num_val = self.num_nodes - num_test - num_train train_mask = np.array(sorted(sindex[0: num_train])) valid_mask = np.array(sorted(sindex[num_train: num_train + num_val])) test_mask = np.array(sorted(sindex[-num_test:])) if self.cache_dataset: ensure_dir(self.cache_file_folder) np.savez_compressed( self.cache_file_name, node_features=node_features, train_mask=train_mask, valid_mask=valid_mask, test_mask=test_mask ) self._logger.info('Saved at ' + self.cache_file_name) self._logger.info("len train feature\t" + str(len(train_mask))) self._logger.info("len eval feature\t" + str(len(valid_mask))) self._logger.info("len test feature\t" + str(len(test_mask))) return node_features, train_mask, valid_mask, test_mask
[docs] def _load_cache_train_val_test(self): """ 加载之前缓存好的训练集、测试集、验证集 """ self._logger.info('Loading ' + self.cache_file_name) cat_data = np.load(self.cache_file_name, allow_pickle=True) node_features = cat_data['node_features'] train_mask = cat_data['train_mask'] valid_mask = cat_data['valid_mask'] test_mask = cat_data['test_mask'] self._logger.info("len train feature\t" + str(len(train_mask))) self._logger.info("len eval feature\t" + str(len(valid_mask))) self._logger.info("len test feature\t" + str(len(test_mask))) return node_features, train_mask, valid_mask, test_mask
[docs] def _get_scalar(self, scaler_type, data): """ 根据全局参数`scaler_type`选择数据归一化方法 Args: data: 训练数据X Returns: Scaler: 归一化对象 """ if scaler_type == "normal": scaler = NormalScaler(maxx=data.max()) self._logger.info('NormalScaler max: ' + str(scaler.max)) elif scaler_type == "standard": scaler = StandardScaler(mean=data.mean(), std=data.std()) self._logger.info('StandardScaler mean: ' + str(scaler.mean) + ', std: ' + str(scaler.std)) elif scaler_type == "minmax01": scaler = MinMax01Scaler( maxx=data.max(), minn=data.min()) self._logger.info('MinMax01Scaler max: ' + str(scaler.max) + ', min: ' + str(scaler.min)) elif scaler_type == "minmax11": scaler = MinMax11Scaler( maxx=data.max(), minn=data.min()) self._logger.info('MinMax11Scaler max: ' + str(scaler.max) + ', min: ' + str(scaler.min)) elif scaler_type == "log": scaler = LogScaler() self._logger.info('LogScaler') elif scaler_type == "none": scaler = NoneScaler() self._logger.info('NoneScaler') else: raise ValueError('Scaler type error!') return scaler
[docs] def get_data(self): """ 返回数据的DataLoader,包括训练数据、测试数据、验证数据 Returns: batch_data: dict """ # 加载数据集 if self.cache_dataset and os.path.exists(self.cache_file_name): node_features, train_mask, valid_mask, test_mask = self._load_cache_train_val_test() else: node_features, train_mask, valid_mask, test_mask = self._split_train_val_test() # 数据归一化 self.feature_dim = node_features.shape[-1] self.scaler = self._get_scalar(self.scaler_type, node_features) node_features = self.scaler.transform(node_features) self.train_dataloader = {'node_features': node_features, 'mask': train_mask} self.eval_dataloader = {'node_features': node_features, 'mask': valid_mask} self.test_dataloader = {'node_features': node_features, 'mask': test_mask} return self.train_dataloader, self.eval_dataloader, self.test_dataloader
[docs] def get_data_feature(self): """ 返回一个 dict,包含数据集的相关特征 Returns: dict: 包含数据集的相关特征的字典 """ return {"scaler": self.scaler, "adj_mx": self.adj_mx, "num_nodes": self.num_nodes, "feature_dim": self.feature_dim}