import os
import numpy as np
from scipy.spatial.distance import cdist
from libcity.utils import ensure_dir
from libcity.data.dataset import TrafficStatePointDataset
# from libcity.data.dataset import TrafficStateGridDataset
"""
主要功能是定义了一种根据原始交通状态数据计算邻接矩阵的方法
CCRNNDataset既可以继承TrafficStatePointDataset,也可以继承TrafficStateGridDataset以处理网格数据
修改成TrafficStateGridDataset时,只需要修改:
1.TrafficStatePointDataset-->TrafficStateGridDataset
2.self.use_row_column = False, 可以加到self.parameters_str中
3.需要修改_generate_graph_with_data函数!
"""
[docs]class CCRNNDataset(TrafficStatePointDataset):
def __init__(self, config):
super().__init__(config)
self.use_row_column = False
self.hidden_size = config.get('hidden_size', 20)
self.method = config.get('method', 'big')
self.parameters_str += '_save_adj'
self.cache_file_name = os.path.join('./libcity/cache/dataset_cache/',
'point_based_{}.npz'.format(self.parameters_str))
[docs] def _load_rel(self):
"""
根据网格结构构建邻接矩阵,一个格子跟他周围的8个格子邻接
Returns:
np.ndarray: self.adj_mx, N*N的邻接矩阵
"""
pass
# self._logger.info("Generate rel file from data, shape=" + str(self.adj_mx.shape))
[docs] def _generate_data(self):
"""
加载数据文件(.dyna/.grid/.od/.gridod)和外部数据(.ext),且将二者融合,以X,y的形式返回
Returns:
tuple: tuple contains:
x(np.ndarray): 模型输入数据,(num_samples, input_length, ..., feature_dim) \n
y(np.ndarray): 模型输出数据,(num_samples, output_length, ..., feature_dim)
"""
# 处理多数据文件问题
if isinstance(self.data_files, list):
data_files = self.data_files.copy()
else: # str
data_files = [self.data_files].copy()
# 加载外部数据
if self.load_external and os.path.exists(self.data_path + self.ext_file + '.ext'): # 外部数据集
ext_data = self._load_ext()
else:
ext_data = None
x_list, y_list = [], []
df_list = []
for filename in data_files:
df = self._load_dyna(filename) # (len_time, ..., feature_dim)
df_list.append(df.copy())
if self.load_external:
df = self._add_external_information(df, ext_data)
x, y = self._generate_input_data(df)
# x: (num_samples, input_length, ..., input_dim)
# y: (num_samples, output_length, ..., output_dim)
x_list.append(x)
y_list.append(y)
x = np.concatenate(x_list)
y = np.concatenate(y_list)
df = np.concatenate(df_list)
self._logger.info("Dataset created")
self._logger.info("x shape: " + str(x.shape) + ", y shape: " + str(y.shape))
return x, y, df
[docs] def _split_train_val_test(self, x, y, df=None):
"""
划分训练集、测试集、验证集,并缓存数据集
Args:
x(np.ndarray): 输入数据 (num_samples, input_length, ..., feature_dim)
y(np.ndarray): 输出数据 (num_samples, input_length, ..., feature_dim)
Returns:
tuple: tuple contains:
x_train: (num_samples, input_length, ..., feature_dim) \n
y_train: (num_samples, input_length, ..., feature_dim) \n
x_val: (num_samples, input_length, ..., feature_dim) \n
y_val: (num_samples, input_length, ..., feature_dim) \n
x_test: (num_samples, input_length, ..., feature_dim) \n
y_test: (num_samples, input_length, ..., feature_dim)
"""
test_rate = 1 - self.train_rate - self.eval_rate
num_samples = x.shape[0]
num_test = round(num_samples * test_rate)
num_train = round(num_samples * self.train_rate)
num_val = num_samples - num_test - num_train
# train
x_train, y_train = x[:num_train], y[:num_train]
# val
x_val, y_val = x[num_train: num_train + num_val], y[num_train: num_train + num_val]
# test
x_test, y_test = x[-num_test:], y[-num_test:]
self._logger.info("train\t" + "x: " + str(x_train.shape) + ", y: " + str(y_train.shape))
self._logger.info("eval\t" + "x: " + str(x_val.shape) + ", y: " + str(y_val.shape))
self._logger.info("test\t" + "x: " + str(x_test.shape) + ", y: " + str(y_test.shape))
self.adj_mx = self._generate_graph_with_data(data=df, len=num_train)
if self.cache_dataset:
ensure_dir(self.cache_file_folder)
np.savez_compressed(
self.cache_file_name,
x_train=x_train,
y_train=y_train,
x_test=x_test,
y_test=y_test,
x_val=x_val,
y_val=y_val,
adj_mx=self.adj_mx
)
self._logger.info('Saved at ' + self.cache_file_name)
return x_train, y_train, x_val, y_val, x_test, y_test
[docs] def _generate_train_val_test(self):
"""
加载数据集,并划分训练集、测试集、验证集,并缓存数据集
Returns:
tuple: tuple contains:
x_train: (num_samples, input_length, ..., feature_dim) \n
y_train: (num_samples, input_length, ..., feature_dim) \n
x_val: (num_samples, input_length, ..., feature_dim) \n
y_val: (num_samples, input_length, ..., feature_dim) \n
x_test: (num_samples, input_length, ..., feature_dim) \n
y_test: (num_samples, input_length, ..., feature_dim)
"""
x, y, df = self._generate_data()
return self._split_train_val_test(x, y, df)
[docs] def _load_cache_train_val_test(self):
"""
加载之前缓存好的训练集、测试集、验证集
Returns:
tuple: tuple contains:
x_train: (num_samples, input_length, ..., feature_dim) \n
y_train: (num_samples, input_length, ..., feature_dim) \n
x_val: (num_samples, input_length, ..., feature_dim) \n
y_val: (num_samples, input_length, ..., feature_dim) \n
x_test: (num_samples, input_length, ..., feature_dim) \n
y_test: (num_samples, input_length, ..., feature_dim)
"""
self._logger.info('Loading ' + self.cache_file_name)
cat_data = np.load(self.cache_file_name)
x_train = cat_data['x_train']
y_train = cat_data['y_train']
x_test = cat_data['x_test']
y_test = cat_data['y_test']
x_val = cat_data['x_val']
y_val = cat_data['y_val']
self.adj_mx = cat_data['adj_mx']
self._logger.info("train\t" + "x: " + str(x_train.shape) + ", y: " + str(y_train.shape))
self._logger.info("eval\t" + "x: " + str(x_val.shape) + ", y: " + str(y_val.shape))
self._logger.info("test\t" + "x: " + str(x_test.shape) + ", y: " + str(y_test.shape))
self._logger.info("Generate rel file from data, shape=" + str(self.adj_mx.shape))
return x_train, y_train, x_val, y_val, x_test, y_test
def _generate_graph_with_data(self, data, len):
data = data[:len, ...]
len_time, num_nodes, feature_dim = data.shape[0], data.shape[1], data.shape[2]
inputs = np.swapaxes(data, 1, 2).reshape(-1, num_nodes) # m*n
self._logger.info("Start singular value decomposition, data.shape={}!".format(str(inputs.shape)))
u, s, v = np.linalg.svd(inputs) # u=(m*m), v=(n*n)
w = np.diag(s[:self.hidden_size]).dot(v[:self.hidden_size, :]).T # n*hid
support = None
if self.method == 'big':
self._logger.info("Start calculating adjacency matrix!")
graph = cdist(w, w, metric='euclidean') # n*n
support = graph * -1 / np.std(graph) ** 2
support = np.exp(support) # n*n
elif self.method == 'small':
support = w # n*hid
self._logger.info("Generate rel file from data, shape=" + str(support.shape))
return support