import os
import sys
import numpy as np
import datetime
from libcity.data.dataset import TrafficStateDataset
from libcity.data.utils import generate_dataloader
from libcity.utils import ensure_dir
[docs]class TrafficStateCPTDataset(TrafficStateDataset):
"""
交通状态预测数据集的另一个基类
部分交通预测模型通过对接近度(closeness)/周期(period)/趋势(trend)进行建模实现预测。
默认使用len_closeness/len_period/len_trend的数据预测当前时刻的数据,即一个X,一个y。(一般是单步预测)
**数据原始的时间戳不能为空!**。
一般对外部数据进行单独建模,因此数据为[X, y, X_ext(可选), y_ext(可选)]。
默认使用`train_rate`和`eval_rate`在样本数量(num_samples)维度上直接切分训练集、测试集、验证集。
"""
def __init__(self, config):
super().__init__(config)
self.offset_frame = np.timedelta64(self.time_intervals // 60, 'm') # 时间片长度 min
self.len_closeness = self.config.get('len_closeness', 3)
self.len_period = self.config.get('len_period', 4)
self.len_trend = self.config.get('len_trend', 0)
assert (self.len_closeness + self.len_period + self.len_trend > 0)
self.pad_forward_period = self.config.get('pad_forward_period', 0)
self.pad_back_period = self.config.get('pad_back_period', 0)
self.pad_forward_trend = self.config.get('pad_forward_trend', 0)
self.pad_back_trend = self.config.get('pad_back_trend', 0)
self.interval_period = self.config.get('interval_period', 1)
self.interval_trend = self.config.get('interval_trend', 7)
self.feature_name = {'X': 'float', 'y': 'float', 'X_ext': 'float', 'y_ext': 'float'}
[docs] def _get_external_array(self, timestamp_list, ext_data=None, previous_ext=False):
"""
根据时间戳数组,获取对应时间的外部特征
Args:
timestamp_list(np.ndarray): 时间戳序列
ext_data: 外部数据
previous_ext: 是否是用过去时间段的外部数据,因为对于预测的时间段Y,
一般没有真实的外部数据,所以用前一个时刻的数据,**多步预测则用提前多步的数据**
Returns:
np.ndarray: External data shape is (len(timestamp_list), ext_dim)
"""
data_list = []
if self.add_time_in_day:
time_ind = (timestamp_list - timestamp_list.astype("datetime64[D]")) / np.timedelta64(1, "D")
data_list.append(time_ind.reshape(time_ind.shape[0], 1))
if self.add_day_in_week:
dayofweek = []
for day in timestamp_list.astype("datetime64[D]"):
dayofweek.append(datetime.datetime.strptime(str(day), '%Y-%m-%d').weekday())
day_in_week = np.zeros(shape=(len(timestamp_list), 7))
day_in_week[np.arange(len(timestamp_list)), dayofweek] = 1
data_list.append(day_in_week)
# 外部数据集
if ext_data is not None:
indexs = []
for ts in timestamp_list:
if previous_ext:
ts_index = self.idx_of_ext_timesolts[ts - self.offset_frame]
else:
ts_index = self.idx_of_ext_timesolts[ts]
indexs.append(ts_index)
select_data = ext_data[indexs] # len(timestamp_list) * ext_dim 选出所需要的时间步的数据
data_list.append(select_data)
if len(data_list) > 0:
data = np.concatenate(data_list, axis=1)
else:
data = np.zeros((len(timestamp_list), 0))
return data # (len(timestamp_list), ext_dim)
[docs] def _load_data(self):
"""
加载数据文件(.dyna/.grid/.od/.gridod)
Returns:
tuple: tuple contains:
x: (num_samples, T_c+T_p+T_t, ..., feature_dim)
y: (num_samples, 1, ..., feature_dim)
ts_x: (num_samples, T_c+T_p+T_t)
ts_y: (num_samples, )
"""
# 处理多数据文件问题
if isinstance(self.data_files, list):
data_files = self.data_files
else: # str
data_files = [self.data_files]
x_list, y_list, ts_x_list, ts_y_list = [], [], [], []
for filename in data_files:
df = self._load_dyna(filename) # (len_time, ..., feature_dim)
x, y, ts_x, ts_y = self._generate_input_data(df)
x_list.append(x) # x: (num_samples, T_c+T_p+T_t, ..., feature_dim)
y_list.append(y) # y: (num_samples, 1, ..., feature_dim)
ts_x_list.append(ts_x) # ts_x: (num_samples, T_c+T_p+T_t)
ts_y_list.append(ts_y) # ts_y: (num_samples, )
x = np.concatenate(x_list) # (num_samples_plus, T_c+T_p+T_t, ..., feature_dim)
y = np.concatenate(y_list) # (num_samples_plus, 1, ..., feature_dim)
ts_x = np.concatenate(ts_x_list) # (num_samples_plus, T_c+T_p+T_t)
ts_y = np.concatenate(ts_y_list) # (num_samples_plus, )
return x, y, ts_x, ts_y
[docs] def _load_ext_data(self, ts_x, ts_y):
"""
加载对应时间的外部数据(.ext)
Args:
ts_x: 输入数据X对应的时间戳,shape: (num_samples, T_c+T_p+T_t)
ts_y: 输出数据Y对应的时间戳,shape:(num_samples, )
Returns:
tuple: tuple contains:
ext_x(np.ndarray): 对应时间的外部数据, shape: (num_samples, T_c+T_p+T_t, ext_dim),
ext_y(np.ndarray): 对应时间的外部数据, shape: (num_samples, ext_dim)
"""
# 加载外部数据
if self.load_external and os.path.exists(self.data_path + self.ext_file + '.ext'): # 外部数据集
ext_data = self._load_ext()
else:
ext_data = None
ext_x = []
for ts in ts_x:
ext_x.append(self._get_external_array(ts, ext_data))
ext_x = np.asarray(ext_x)
# ext_x: (num_samples_plus, T_c+T_p+T_t, ext_dim)
ext_y = self._get_external_array(ts_y, ext_data, previous_ext=True)
# ext_y: (num_samples_plus, ext_dim)
return ext_x, ext_y
[docs] def _generate_data(self):
"""
加载数据文件(.dyna/.grid/.od/.gridod)和外部数据(.ext)
Returns:
tuple: tuple contains:
x(np.ndarray): 模型输入数据,(num_samples, T_c+T_p+T_t, ..., feature_dim) \n
y(np.ndarray): 模型输出数据,(num_samples, 1, ..., feature_dim) \n
ext_x(np.ndarray): 模型输入外部数据,(num_samples, T_c+T_p+T_t, ext_dim) \n
ext_y(np.ndarray): 模型输出外部数据,(num_samples, ext_dim)
"""
x, y, ts_x, ts_y = self._load_data()
if self.load_external:
ext_x, ext_y = self._load_ext_data(ts_x, ts_y)
else:
ext_x, ext_y = np.zeros((x.shape[0], 0)), np.zeros((y.shape[0], 0))
self._logger.info("Dataset created")
self._logger.info("x shape: " + str(x.shape) + ", y shape: " + str(y.shape))
self._logger.info("ext_x shape: " + str(ext_x.shape) + ", ext_y shape: " + str(ext_y.shape))
return x, y, ext_x, ext_y
[docs] def _split_train_val_test(self, x, y, ext_x=None, ext_y=None):
"""
划分训练集、测试集、验证集,并缓存数据集
Args:
x(np.ndarray): 输入数据 (num_samples, T_c+T_p+T_t, ..., feature_dim)
y(np.ndarray): 输出数据 (num_samples, 1, ..., feature_dim)
ext_x(np.ndarray): 输入外部数据 (num_samples, T_c+T_p+T_t, ext_dim)
ext_y(np.ndarray): 输出外部数据 (num_samples, ext_dim)
Returns:
tuple: tuple contains:
x_train: (num_samples, T_c+T_p+T_t, ..., feature_dim) \n
y_train: (num_samples, 1, ..., feature_dim) \n
x_val: (num_samples, T_c+T_p+T_t, ..., feature_dim) \n
y_val: (num_samples, 1, ..., feature_dim) \n
x_test: (num_samples, T_c+T_p+T_t, ..., feature_dim) \n
y_test: (num_samples, 1, ..., feature_dim) \n
ext_x_train: (num_samples, T_c+T_p+T_t, ext_dim) \n
ext_y_train: (num_samples, ext_dim) \n
ext_x_val: (num_samples, T_c+T_p+T_t, ext_dim) \n
ext_y_val: (num_samples, ext_dim) \n
ext_x_test: (num_samples, T_c+T_p+T_t, ext_dim) \n
ext_y_test: (num_samples, ext_dim)
"""
test_rate = 1 - self.train_rate - self.eval_rate
num_samples = x.shape[0]
num_test = round(num_samples * test_rate)
num_train = round(num_samples * self.train_rate)
num_val = num_samples - num_test - num_train
x_train, x_val, x_test = x[:num_train], x[num_train: num_train + num_val], x[-num_test:]
y_train, y_val, y_test = y[:num_train], y[num_train: num_train + num_val], y[-num_test:]
ext_x_train, ext_x_val, ext_x_test = ext_x[:num_train], ext_x[num_train: num_train + num_val], ext_x[-num_test:]
ext_y_train, ext_y_val, ext_y_test = ext_y[:num_train], ext_y[num_train: num_train + num_val], ext_y[-num_test:]
self._logger.info("train\t" + "x: " + str(x_train.shape) + ", y: " + str(y_train.shape)
+ ", x_ext: " + str(ext_x_train.shape) + ", y_ext: " + str(ext_y_train.shape))
self._logger.info("eval\t" + "x: " + str(x_val.shape) + ", y: " + str(y_val.shape)
+ ", x_ext: " + str(ext_x_val.shape) + ", y_ext: " + str(ext_y_val.shape))
self._logger.info("test\t" + "x: " + str(x_test.shape) + ", y: " + str(y_test.shape)
+ ", x_ext: " + str(ext_x_test.shape) + ", y_ext: " + str(ext_y_test.shape))
if self.cache_dataset:
ensure_dir(self.cache_file_folder)
np.savez_compressed(
self.cache_file_name,
x_train=x_train, y_train=y_train,
x_test=x_test, y_test=y_test,
x_val=x_val, y_val=y_val,
ext_x_train=ext_x_train, ext_y_train=ext_y_train,
ext_x_test=ext_x_test, ext_y_test=ext_y_test,
ext_x_val=ext_x_val, ext_y_val=ext_y_val,
)
self._logger.info('Saved at ' + self.cache_file_name)
return x_train, y_train, x_val, y_val, x_test, y_test, \
ext_x_train, ext_y_train, ext_x_test, ext_y_test, ext_x_val, ext_y_val
[docs] def _generate_train_val_test(self):
"""
加载数据集,并划分训练集、测试集、验证集,并缓存数据集
Returns:
tuple: tuple contains:
x_train: (num_samples, T_c+T_p+T_t, ..., feature_dim) \n
y_train: (num_samples, 1, ..., feature_dim) \n
x_val: (num_samples, T_c+T_p+T_t, ..., feature_dim) \n
y_val: (num_samples, 1, ..., feature_dim) \n
x_test: (num_samples, T_c+T_p+T_t, ..., feature_dim) \n
y_test: (num_samples, 1, ..., feature_dim) \n
ext_x_train: (num_samples, T_c+T_p+T_t, ext_dim) \n
ext_y_train: (num_samples, ext_dim) \n
ext_x_val: (num_samples, T_c+T_p+T_t, ext_dim) \n
ext_y_val: (num_samples, ext_dim) \n
ext_x_test: (num_samples, T_c+T_p+T_t, ext_dim) \n
ext_y_test: (num_samples, ext_dim)
"""
x, y, ext_x, ext_y = self._generate_data()
return self._split_train_val_test(x, y, ext_x, ext_y)
[docs] def _load_cache_train_val_test(self):
"""
加载之前缓存好的训练集、测试集、验证集
Returns:
tuple: tuple contains:
x_train: (num_samples, T_c+T_p+T_t, ..., feature_dim) \n
y_train: (num_samples, 1, ..., feature_dim) \n
x_val: (num_samples, T_c+T_p+T_t, ..., feature_dim) \n
y_val: (num_samples, 1, ..., feature_dim) \n
x_test: (num_samples, T_c+T_p+T_t, ..., feature_dim) \n
y_test: (num_samples, 1, ..., feature_dim) \n
ext_x_train: (num_samples, T_c+T_p+T_t, ext_dim) \n
ext_y_train: (num_samples, ext_dim) \n
ext_x_val: (num_samples, T_c+T_p+T_t, ext_dim) \n
ext_y_val: (num_samples, ext_dim) \n
ext_x_test: (num_samples, T_c+T_p+T_t, ext_dim) \n
ext_y_test: (num_samples, ext_dim)
"""
self._logger.info('Loading ' + self.cache_file_name)
cat_data = np.load(self.cache_file_name)
x_train = cat_data['x_train']
y_train = cat_data['y_train']
x_test = cat_data['x_test']
y_test = cat_data['y_test']
x_val = cat_data['x_val']
y_val = cat_data['y_val']
ext_x_train = cat_data['ext_x_train']
ext_y_train = cat_data['ext_y_train']
ext_x_test = cat_data['ext_x_test']
ext_y_test = cat_data['ext_y_test']
ext_x_val = cat_data['ext_x_val']
ext_y_val = cat_data['ext_y_val']
self._logger.info("train\t" + "x: " + str(x_train.shape) + ", y: " + str(y_train.shape)
+ ", x_ext: " + str(ext_x_train.shape) + ", y_ext: " + str(ext_y_train.shape))
self._logger.info("eval\t" + "x: " + str(x_val.shape) + ", y: " + str(y_val.shape)
+ ", x_ext: " + str(ext_x_val.shape) + ", y_ext: " + str(ext_y_val.shape))
self._logger.info("test\t" + "x: " + str(x_test.shape) + ", y: " + str(y_test.shape)
+ ", x_ext: " + str(ext_x_test.shape) + ", y_ext: " + str(ext_y_test.shape))
return x_train, y_train, x_val, y_val, x_test, y_test, \
ext_x_train, ext_y_train, ext_x_test, ext_y_test, ext_x_val, ext_y_val
[docs] def get_data(self):
"""
返回数据的DataLoader,包括训练数据、测试数据、验证数据
Returns:
tuple: tuple contains:
train_dataloader: Dataloader composed of Batch (class) \n
eval_dataloader: Dataloader composed of Batch (class) \n
test_dataloader: Dataloader composed of Batch (class)
"""
# 加载数据集
x_train, y_train, x_val, y_val, x_test, y_test = [], [], [], [], [], []
ext_x_train, ext_y_train, ext_x_test, ext_y_test, ext_x_val, ext_y_val = [], [], [], [], [], []
if self.data is None:
self.data = {}
if self.cache_dataset and os.path.exists(self.cache_file_name):
x_train, y_train, x_val, y_val, x_test, y_test, \
ext_x_train, ext_y_train, ext_x_test, ext_y_test, ext_x_val, ext_y_val \
= self._load_cache_train_val_test()
else:
x_train, y_train, x_val, y_val, x_test, y_test, \
ext_x_train, ext_y_train, ext_x_test, ext_y_test, ext_x_val, ext_y_val \
= self._generate_train_val_test()
# 数据归一化
self.feature_dim = x_train.shape[-1]
self.ext_dim = ext_x_train.shape[-1]
self.scaler = self._get_scalar(self.scaler_type,
x_train[..., :self.output_dim], y_train[..., :self.output_dim])
self.ext_scaler = self._get_scalar(self.ext_scaler_type,
x_train[..., self.output_dim:], y_train[..., self.output_dim:])
x_train = self.scaler.transform(x_train)
y_train = self.scaler.transform(y_train)
x_val = self.scaler.transform(x_val)
y_val = self.scaler.transform(y_val)
x_test = self.scaler.transform(x_test)
y_test = self.scaler.transform(y_test)
if self.normal_external:
ext_x_train = self.ext_scaler.transform(ext_x_train)
ext_y_train = self.ext_scaler.transform(ext_y_train)
ext_x_val = self.ext_scaler.transform(ext_x_val)
ext_y_val = self.ext_scaler.transform(ext_y_val)
ext_x_test = self.ext_scaler.transform(ext_x_test)
ext_y_test = self.ext_scaler.transform(ext_y_test)
# 把训练集的X和y聚合在一起成为list,测试集验证集同理
# x_train/y_train: (num_samples, input_length, ..., feature_dim)
# train_data(list): train_data[i]是一个元组,由x_train[i]和y_train[i]组成
train_data = list(zip(x_train, y_train, ext_x_train, ext_y_train))
eval_data = list(zip(x_val, y_val, ext_x_val, ext_y_val))
test_data = list(zip(x_test, y_test, ext_x_test, ext_y_test))
# 转Dataloader
self.train_dataloader, self.eval_dataloader, self.test_dataloader = \
generate_dataloader(train_data, eval_data, test_data, self.feature_name,
self.batch_size, self.num_workers, pad_with_last_sample=self.pad_with_last_sample)
return self.train_dataloader, self.eval_dataloader, self.test_dataloader
[docs] def get_data_feature(self):
"""
由于此类的数据输入包含len_closeness/len_period/len_trend的数据,但都融合到X中,
因此,继承此类的子类此函数应该返回这三段数据的具体长度(不一定等于上述的三个参数的值)
Returns:
dict: 包含数据集的相关特征的字典
"""
raise NotImplementedError('Please implement the function `get_data_feature()`.')