import os
import pandas as pd
import numpy as np
import datetime
from logging import getLogger
from libcity.data.dataset import AbstractDataset
from libcity.data.utils import generate_dataloader
from libcity.utils import StandardScaler, NormalScaler, NoneScaler, \
MinMax01Scaler, MinMax11Scaler, LogScaler, ensure_dir
[docs]class TrafficStateDataset(AbstractDataset):
"""
交通状态预测数据集的基类。
默认使用`input_window`的数据预测`output_window`对应的数据,即一个X,一个y。
一般将外部数据融合到X中共同进行预测,因此数据为[X, y]。
默认使用`train_rate`和`eval_rate`在样本数量(num_samples)维度上直接切分训练集、测试集、验证集。
"""
def __init__(self, config):
self.config = config
self.dataset = self.config.get('dataset', '')
self.batch_size = self.config.get('batch_size', 64)
self.cache_dataset = self.config.get('cache_dataset', True)
self.num_workers = self.config.get('num_workers', 0)
self.pad_with_last_sample = self.config.get('pad_with_last_sample', True)
self.train_rate = self.config.get('train_rate', 0.7)
self.eval_rate = self.config.get('eval_rate', 0.1)
self.scaler_type = self.config.get('scaler', 'none')
self.ext_scaler_type = self.config.get('ext_scaler', 'none')
self.load_external = self.config.get('load_external', False)
self.normal_external = self.config.get('normal_external', False)
self.add_time_in_day = self.config.get('add_time_in_day', False)
self.add_day_in_week = self.config.get('add_day_in_week', False)
self.input_window = self.config.get('input_window', 12)
self.output_window = self.config.get('output_window', 12)
self.parameters_str = \
str(self.dataset) + '_' + str(self.input_window) + '_' + str(self.output_window) + '_' \
+ str(self.train_rate) + '_' + str(self.eval_rate) + '_' + str(self.scaler_type) + '_' \
+ str(self.batch_size) + '_' + str(self.load_external) + '_' + str(self.add_time_in_day) + '_' \
+ str(self.add_day_in_week) + '_' + str(self.pad_with_last_sample)
self.cache_file_name = os.path.join('./libcity/cache/dataset_cache/',
'traffic_state_{}.npz'.format(self.parameters_str))
self.cache_file_folder = './libcity/cache/dataset_cache/'
ensure_dir(self.cache_file_folder)
self.data_path = './raw_data/' + self.dataset + '/'
if not os.path.exists(self.data_path):
raise ValueError("Dataset {} not exist! Please ensure the path "
"'./raw_data/{}/' exist!".format(self.dataset, self.dataset))
# 加载数据集的config.json文件
self.weight_col = self.config.get('weight_col', '')
self.data_col = self.config.get('data_col', '')
self.ext_col = self.config.get('ext_col', '')
self.geo_file = self.config.get('geo_file', self.dataset)
self.rel_file = self.config.get('rel_file', self.dataset)
self.data_files = self.config.get('data_files', self.dataset)
self.ext_file = self.config.get('ext_file', self.dataset)
self.output_dim = self.config.get('output_dim', 1)
self.time_intervals = self.config.get('time_intervals', 300) # s
self.init_weight_inf_or_zero = self.config.get('init_weight_inf_or_zero', 'inf')
self.set_weight_link_or_dist = self.config.get('set_weight_link_or_dist', 'dist')
self.bidir_adj_mx = self.config.get('bidir_adj_mx', False)
self.calculate_weight_adj = self.config.get('calculate_weight_adj', False)
self.weight_adj_epsilon = self.config.get('weight_adj_epsilon', 0.1)
# 初始化
self.data = None
self.feature_name = {'X': 'float', 'y': 'float'} # 此类的输入只有X和y
self.adj_mx = None
self.scaler = None
self.ext_scaler = None
self.feature_dim = 0
self.ext_dim = 0
self.num_nodes = 0
self.num_batches = 0
self._logger = getLogger()
if os.path.exists(self.data_path + self.geo_file + '.geo'):
self._load_geo()
else:
raise ValueError('Not found .geo file!')
if os.path.exists(self.data_path + self.rel_file + '.rel'): # .rel file is not necessary
self._load_rel()
else:
self.adj_mx = np.zeros((len(self.geo_ids), len(self.geo_ids)), dtype=np.float32)
[docs] def _load_geo(self):
"""
加载.geo文件,格式[geo_id, type, coordinates, properties(若干列)]
"""
geofile = pd.read_csv(self.data_path + self.geo_file + '.geo')
self.geo_ids = list(geofile['geo_id'])
self.num_nodes = len(self.geo_ids)
self.geo_to_ind = {}
self.ind_to_geo = {}
for index, idx in enumerate(self.geo_ids):
self.geo_to_ind[idx] = index
self.ind_to_geo[index] = idx
self._logger.info("Loaded file " + self.geo_file + '.geo' + ', num_nodes=' + str(len(self.geo_ids)))
[docs] def _load_grid_geo(self):
"""
加载.geo文件,格式[geo_id, type, coordinates, row_id, column_id, properties(若干列)]
"""
geofile = pd.read_csv(self.data_path + self.geo_file + '.geo')
self.geo_ids = list(geofile['geo_id'])
self.num_nodes = len(self.geo_ids)
self.geo_to_ind = {}
self.geo_to_rc = {}
for index, idx in enumerate(self.geo_ids):
self.geo_to_ind[idx] = index
for i in range(geofile.shape[0]):
self.geo_to_rc[geofile['geo_id'][i]] = [geofile['row_id'][i], geofile['column_id'][i]]
self.len_row = max(list(geofile['row_id'])) + 1
self.len_column = max(list(geofile['column_id'])) + 1
self._logger.info("Loaded file " + self.geo_file + '.geo' + ', num_grids=' + str(len(self.geo_ids))
+ ', grid_size=' + str((self.len_row, self.len_column)))
[docs] def _load_rel(self):
"""
加载.rel文件,格式[rel_id, type, origin_id, destination_id, properties(若干列)],
生成N*N的邻接矩阵,计算逻辑如下:
(1) 权重所对应的列名用全局参数`weight_col`来指定, \
(2) 若没有指定该参数, \
(2.1) rel只有4列,则认为rel中的每一行代表一条邻接边,权重为1。其余边权重为0,代表不邻接。 \
(2.2) rel只有5列,则默认最后一列为`weight_col` \
(2.3) 否则报错 \
(3) 根据得到的权重列`weight_col`计算邻接矩阵 \
(3.1) 参数`bidir_adj_mx`=True代表构造无向图,=False为有向图 \
(3.2) 参数`set_weight_link_or_dist`为`link`代表构造01矩阵,为`dist`代表构造权重矩阵(非01) \
(3.3) 参数`init_weight_inf_or_zero`为`zero`代表矩阵初始化为全0,`inf`代表矩阵初始化成全inf,初始化值也就是rel文件中不存在的边的权值 \
(3.4) 参数`calculate_weight_adj`=True表示对权重矩阵应用带阈值的高斯核函数进行稀疏化,对01矩阵不做处理,=False不进行稀疏化,
修改函数self._calculate_adjacency_matrix()可以构造其他方法替换全阈值高斯核的稀疏化方法 \
Returns:
np.ndarray: self.adj_mx, N*N的邻接矩阵
"""
relfile = pd.read_csv(self.data_path + self.rel_file + '.rel')
self._logger.info('set_weight_link_or_dist: {}'.format(self.set_weight_link_or_dist))
self._logger.info('init_weight_inf_or_zero: {}'.format(self.init_weight_inf_or_zero))
if self.weight_col != '': # 根据weight_col确认权重列
if isinstance(self.weight_col, list):
if len(self.weight_col) != 1:
raise ValueError('`weight_col` parameter must be only one column!')
self.weight_col = self.weight_col[0]
self.distance_df = relfile[~relfile[self.weight_col].isna()][[
'origin_id', 'destination_id', self.weight_col]]
else:
if len(relfile.columns) > 5 or len(relfile.columns) < 4: # properties不只一列,且未指定weight_col,报错
raise ValueError("Don't know which column to be loaded! Please set `weight_col` parameter!")
elif len(relfile.columns) == 4: # 4列说明没有properties列,那就是rel文件中有的代表相邻,否则不相邻
self.calculate_weight_adj = False
self.set_weight_link_or_dist = 'link'
self.init_weight_inf_or_zero = 'zero'
self.distance_df = relfile[['origin_id', 'destination_id']]
else: # len(relfile.columns) == 5, properties只有一列,那就默认这一列是权重列
self.weight_col = relfile.columns[-1]
self.distance_df = relfile[~relfile[self.weight_col].isna()][[
'origin_id', 'destination_id', self.weight_col]]
# 把数据转换成矩阵的形式
self.adj_mx = np.zeros((len(self.geo_ids), len(self.geo_ids)), dtype=np.float32)
if self.init_weight_inf_or_zero.lower() == 'inf' and self.set_weight_link_or_dist.lower() != 'link':
self.adj_mx[:] = np.inf
for row in self.distance_df.values:
if row[0] not in self.geo_to_ind or row[1] not in self.geo_to_ind:
continue
if self.set_weight_link_or_dist.lower() == 'dist': # 保留原始的距离数值
self.adj_mx[self.geo_to_ind[row[0]], self.geo_to_ind[row[1]]] = row[2]
if self.bidir_adj_mx:
self.adj_mx[self.geo_to_ind[row[1]], self.geo_to_ind[row[0]]] = row[2]
else: # self.set_weight_link_or_dist.lower()=='link' 只保留01的邻接性
self.adj_mx[self.geo_to_ind[row[0]], self.geo_to_ind[row[1]]] = 1
if self.bidir_adj_mx:
self.adj_mx[self.geo_to_ind[row[1]], self.geo_to_ind[row[0]]] = 1
self._logger.info("Loaded file " + self.rel_file + '.rel, shape=' + str(self.adj_mx.shape))
# 计算权重
if self.calculate_weight_adj and self.set_weight_link_or_dist.lower() != 'link':
self._calculate_adjacency_matrix()
[docs] def _load_grid_rel(self):
"""
根据网格结构构建邻接矩阵,一个格子跟他周围的8个格子邻接
Returns:
np.ndarray: self.adj_mx, N*N的邻接矩阵
"""
self.adj_mx = np.zeros((len(self.geo_ids), len(self.geo_ids)), dtype=np.float32)
dirs = [[0, 1], [1, 0], [-1, 0], [0, -1], [1, 1], [1, -1], [-1, 1], [-1, -1]]
for i in range(self.len_row):
for j in range(self.len_column):
index = i * self.len_column + j # grid_id
for d in dirs:
nei_i = i + d[0]
nei_j = j + d[1]
if nei_i >= 0 and nei_i < self.len_row and nei_j >= 0 and nei_j < self.len_column:
nei_index = nei_i * self.len_column + nei_j # neighbor_grid_id
self.adj_mx[index][nei_index] = 1
self.adj_mx[nei_index][index] = 1
self._logger.info("Generate grid rel file, shape=" + str(self.adj_mx.shape))
[docs] def _calculate_adjacency_matrix(self):
"""
使用带有阈值的高斯核计算邻接矩阵的权重,如果有其他的计算方法,可以覆盖这个函数,
公式为:$ w_{ij} = \exp \left(- \\frac{d_{ij}^{2}}{\sigma^{2}} \\right) $, $\sigma$ 是方差,
小于阈值`weight_adj_epsilon`的值设为0:$ w_{ij}[w_{ij}<\epsilon]=0 $
Returns:
np.ndarray: self.adj_mx, N*N的邻接矩阵
"""
self._logger.info("Start Calculate the weight by Gauss kernel!")
distances = self.adj_mx[~np.isinf(self.adj_mx)].flatten()
std = distances.std()
self.adj_mx = np.exp(-np.square(self.adj_mx / std))
self.adj_mx[self.adj_mx < self.weight_adj_epsilon] = 0
[docs] def _load_dyna(self, filename):
"""
加载数据文件(.dyna/.grid/.od/.gridod),子类必须实现这个方法来指定如何加载数据文件,返回对应的多维数据,
提供5个实现好的方法加载上述几类文件,并转换成不同形状的数组:
`_load_dyna_3d`/`_load_grid_3d`/`_load_grid_4d`/`_load_grid_od_4d`/`_load_grid_od_6d`
Args:
filename(str): 数据文件名,不包含后缀
Returns:
np.ndarray: 数据数组
"""
raise NotImplementedError('Please implement the function `_load_dyna()`.')
[docs] def _load_dyna_3d(self, filename):
"""
加载.dyna文件,格式[dyna_id, type, time, entity_id, properties(若干列)],
.geo文件中的id顺序应该跟.dyna中一致,
其中全局参数`data_col`用于指定需要加载的数据的列,不设置则默认全部加载
Args:
filename(str): 数据文件名,不包含后缀
Returns:
np.ndarray: 数据数组, 3d-array: (len_time, num_nodes, feature_dim)
"""
# 加载数据集
self._logger.info("Loading file " + filename + '.dyna')
dynafile = pd.read_csv(self.data_path + filename + '.dyna')
if self.data_col != '': # 根据指定的列加载数据集
if isinstance(self.data_col, list):
data_col = self.data_col.copy()
else: # str
data_col = [self.data_col].copy()
data_col.insert(0, 'time')
data_col.insert(1, 'entity_id')
dynafile = dynafile[data_col]
else: # 不指定则加载所有列
dynafile = dynafile[dynafile.columns[2:]] # 从time列开始所有列
# 求时间序列
self.timesolts = list(dynafile['time'][:int(dynafile.shape[0] / len(self.geo_ids))])
self.idx_of_timesolts = dict()
if not dynafile['time'].isna().any(): # 时间没有空值
self.timesolts = list(map(lambda x: x.replace('T', ' ').replace('Z', ''), self.timesolts))
self.timesolts = np.array(self.timesolts, dtype='datetime64[ns]')
for idx, _ts in enumerate(self.timesolts):
self.idx_of_timesolts[_ts] = idx
# 转3-d数组
feature_dim = len(dynafile.columns) - 2
df = dynafile[dynafile.columns[-feature_dim:]]
len_time = len(self.timesolts)
data = []
for i in range(0, df.shape[0], len_time):
data.append(df[i:i+len_time].values)
data = np.array(data, dtype=np.float) # (len(self.geo_ids), len_time, feature_dim)
data = data.swapaxes(0, 1) # (len_time, len(self.geo_ids), feature_dim)
self._logger.info("Loaded file " + filename + '.dyna' + ', shape=' + str(data.shape))
return data
[docs] def _load_grid_3d(self, filename):
"""
加载.grid文件,格式[dyna_id, type, time, row_id, column_id, properties(若干列)],
.geo文件中的id顺序应该跟.dyna中一致,
其中全局参数`data_col`用于指定需要加载的数据的列,不设置则默认全部加载,
Args:
filename(str): 数据文件名,不包含后缀
Returns:
np.ndarray: 数据数组, 3d-array: (len_time, num_grids, feature_dim)
"""
# 加载数据集
self._logger.info("Loading file " + filename + '.grid')
gridfile = pd.read_csv(self.data_path + filename + '.grid')
if self.data_col != '': # 根据指定的列加载数据集
if isinstance(self.data_col, list):
data_col = self.data_col.copy()
else: # str
data_col = [self.data_col].copy()
data_col.insert(0, 'time')
data_col.insert(1, 'row_id')
data_col.insert(2, 'column_id')
gridfile = gridfile[data_col]
else: # 不指定则加载所有列
gridfile = gridfile[gridfile.columns[2:]] # 从time列开始所有列
# 求时间序列
self.timesolts = list(gridfile['time'][:int(gridfile.shape[0] / len(self.geo_ids))])
self.idx_of_timesolts = dict()
if not gridfile['time'].isna().any(): # 时间没有空值
self.timesolts = list(map(lambda x: x.replace('T', ' ').replace('Z', ''), self.timesolts))
self.timesolts = np.array(self.timesolts, dtype='datetime64[ns]')
for idx, _ts in enumerate(self.timesolts):
self.idx_of_timesolts[_ts] = idx
# 转3-d数组
feature_dim = len(gridfile.columns) - 3
df = gridfile[gridfile.columns[-feature_dim:]]
len_time = len(self.timesolts)
data = []
for i in range(0, df.shape[0], len_time):
data.append(df[i:i + len_time].values)
data = np.array(data, dtype=np.float) # (len(self.geo_ids), len_time, feature_dim)
data = data.swapaxes(0, 1) # (len_time, len(self.geo_ids), feature_dim)
self._logger.info("Loaded file " + filename + '.grid' + ', shape=' + str(data.shape))
return data
[docs] def _load_grid_4d(self, filename):
"""
加载.grid文件,格式[dyna_id, type, time, row_id, column_id, properties(若干列)],
.geo文件中的id顺序应该跟.dyna中一致,
其中全局参数`data_col`用于指定需要加载的数据的列,不设置则默认全部加载
Args:
filename(str): 数据文件名,不包含后缀
Returns:
np.ndarray: 数据数组, 4d-array: (len_time, len_row, len_column, feature_dim)
"""
# 加载数据集
self._logger.info("Loading file " + filename + '.grid')
gridfile = pd.read_csv(self.data_path + filename + '.grid')
if self.data_col != '': # 根据指定的列加载数据集
if isinstance(self.data_col, list):
data_col = self.data_col.copy()
else: # str
data_col = [self.data_col].copy()
data_col.insert(0, 'time')
data_col.insert(1, 'row_id')
data_col.insert(2, 'column_id')
gridfile = gridfile[data_col]
else: # 不指定则加载所有列
gridfile = gridfile[gridfile.columns[2:]] # 从time列开始所有列
# 求时间序列
self.timesolts = list(gridfile['time'][:int(gridfile.shape[0] / len(self.geo_ids))])
self.idx_of_timesolts = dict()
if not gridfile['time'].isna().any(): # 时间没有空值
self.timesolts = list(map(lambda x: x.replace('T', ' ').replace('Z', ''), self.timesolts))
self.timesolts = np.array(self.timesolts, dtype='datetime64[ns]')
for idx, _ts in enumerate(self.timesolts):
self.idx_of_timesolts[_ts] = idx
# 转4-d数组
feature_dim = len(gridfile.columns) - 3
df = gridfile[gridfile.columns[-feature_dim:]]
len_time = len(self.timesolts)
data = []
for i in range(self.len_row):
tmp = []
for j in range(self.len_column):
index = (i * self.len_column + j) * len_time
tmp.append(df[index:index + len_time].values)
data.append(tmp)
data = np.array(data, dtype=np.float) # (len_row, len_column, len_time, feature_dim)
data = data.swapaxes(2, 0).swapaxes(1, 2) # (len_time, len_row, len_column, feature_dim)
self._logger.info("Loaded file " + filename + '.grid' + ', shape=' + str(data.shape))
return data
[docs] def _load_od_4d(self, filename):
"""
加载.od文件,格式[dyna_id, type, time, origin_id, destination_id properties(若干列)],
.geo文件中的id顺序应该跟.dyna中一致,
其中全局参数`data_col`用于指定需要加载的数据的列,不设置则默认全部加载
Args:
filename(str): 数据文件名,不包含后缀
Returns:
np.ndarray: 数据数组, 4d-array: (len_time, len_row, len_column, feature_dim)
"""
self._logger.info("Loading file " + filename + '.od')
odfile = pd.read_csv(self.data_path + filename + '.od')
if self.data_col != '': # 根据指定的列加载数据集
if isinstance(self.data_col, list):
data_col = self.data_col.copy()
else: # str
data_col = [self.data_col].copy()
data_col.insert(0, 'time')
data_col.insert(1, 'origin_id')
data_col.insert(2, 'destination_id')
odfile = odfile[data_col]
else: # 不指定则加载所有列
odfile = odfile[odfile.columns[2:]] # 从time列开始所有列
# 求时间序列
self.timesolts = list(odfile['time'][:int(odfile.shape[0] / self.num_nodes / self.num_nodes)])
self.idx_of_timesolts = dict()
if not odfile['time'].isna().any(): # 时间没有空值
self.timesolts = list(map(lambda x: x.replace('T', ' ').replace('Z', ''), self.timesolts))
self.timesolts = np.array(self.timesolts, dtype='datetime64[ns]')
for idx, _ts in enumerate(self.timesolts):
self.idx_of_timesolts[_ts] = idx
feature_dim = len(odfile.columns) - 3
df = odfile[odfile.columns[-feature_dim:]]
len_time = len(self.timesolts)
data = np.zeros((self.num_nodes, self.num_nodes, len_time, feature_dim))
for i in range(self.num_nodes):
origin_index = i * len_time * self.num_nodes # 每个起点占据len_t*n行
for j in range(self.num_nodes):
destination_index = j * len_time # 每个终点占据len_t行
index = origin_index + destination_index
data[i][j] = df[index:index + len_time].values
data = data.transpose((2, 0, 1, 3)) # (len_time, num_nodes, num_nodes, feature_dim)
self._logger.info("Loaded file " + filename + '.od' + ', shape=' + str(data.shape))
return data
[docs] def _load_grid_od_4d(self, filename):
"""
加载.gridod文件,格式[dyna_id, type, time, origin_row_id, origin_column_id,
destination_row_id, destination_column_id, properties(若干列)],
.geo文件中的id顺序应该跟.dyna中一致,
其中全局参数`data_col`用于指定需要加载的数据的列,不设置则默认全部加载
Args:
filename(str): 数据文件名,不包含后缀
Returns:
np.ndarray: 数据数组, 4d-array: (len_time, num_grids, num_grids, feature_dim)
"""
# 加载数据集
self._logger.info("Loading file " + filename + '.gridod')
gridodfile = pd.read_csv(self.data_path + filename + '.gridod')
if self.data_col != '': # 根据指定的列加载数据集
if isinstance(self.data_col, list):
data_col = self.data_col.copy()
else: # str
data_col = [self.data_col].copy()
data_col.insert(0, 'time')
data_col.insert(1, 'origin_row_id')
data_col.insert(2, 'origin_column_id')
data_col.insert(3, 'destination_row_id')
data_col.insert(4, 'destination_column_id')
gridodfile = gridodfile[data_col]
else: # 不指定则加载所有列
gridodfile = gridodfile[gridodfile.columns[2:]] # 从time列开始所有列
# 求时间序列
self.timesolts = list(gridodfile['time'][:int(gridodfile.shape[0] / len(self.geo_ids) / len(self.geo_ids))])
self.idx_of_timesolts = dict()
if not gridodfile['time'].isna().any(): # 时间没有空值
self.timesolts = list(map(lambda x: x.replace('T', ' ').replace('Z', ''), self.timesolts))
self.timesolts = np.array(self.timesolts, dtype='datetime64[ns]')
for idx, _ts in enumerate(self.timesolts):
self.idx_of_timesolts[_ts] = idx
# 转4-d数组
feature_dim = len(gridodfile.columns) - 5
df = gridodfile[gridodfile.columns[-feature_dim:]]
len_time = len(self.timesolts)
data = np.zeros((len(self.geo_ids), len(self.geo_ids), len_time, feature_dim))
for oi in range(self.len_row):
for oj in range(self.len_column):
origin_index = (oi * self.len_column + oj) * len_time * len(self.geo_ids) # 每个起点占据len_t*n行
for di in range(self.len_row):
for dj in range(self.len_column):
destination_index = (di * self.len_column + dj) * len_time # 每个终点占据len_t行
index = origin_index + destination_index
# print(index, index + len_time)
# print((oi, oj), (di, dj))
# print(oi * self.len_column + oj, di * self.len_column + dj)
data[oi * self.len_column + oj][di * self.len_column + dj] = df[index:index + len_time].values
data = data.transpose((2, 0, 1, 3)) # (len_time, num_grids, num_grids, feature_dim)
self._logger.info("Loaded file " + filename + '.gridod' + ', shape=' + str(data.shape))
return data
[docs] def _load_grid_od_6d(self, filename):
"""
加载.gridod文件,格式[dyna_id, type, time, origin_row_id, origin_column_id,
destination_row_id, destination_column_id, properties(若干列)],
.geo文件中的id顺序应该跟.dyna中一致,
其中全局参数`data_col`用于指定需要加载的数据的列,不设置则默认全部加载
Args:
filename(str): 数据文件名,不包含后缀
Returns:
np.ndarray: 数据数组, 6d-array: (len_time, len_row, len_column, len_row, len_column, feature_dim)
"""
# 加载数据集
self._logger.info("Loading file " + filename + '.gridod')
gridodfile = pd.read_csv(self.data_path + filename + '.gridod')
if self.data_col != '': # 根据指定的列加载数据集
if isinstance(self.data_col, list):
data_col = self.data_col.copy()
else: # str
data_col = [self.data_col].copy()
data_col.insert(0, 'time')
data_col.insert(1, 'origin_row_id')
data_col.insert(2, 'origin_column_id')
data_col.insert(3, 'destination_row_id')
data_col.insert(4, 'destination_column_id')
gridodfile = gridodfile[data_col]
else: # 不指定则加载所有列
gridodfile = gridodfile[gridodfile.columns[2:]] # 从time列开始所有列
# 求时间序列
self.timesolts = list(gridodfile['time'][:int(gridodfile.shape[0] / len(self.geo_ids) / len(self.geo_ids))])
self.idx_of_timesolts = dict()
if not gridodfile['time'].isna().any(): # 时间没有空值
self.timesolts = list(map(lambda x: x.replace('T', ' ').replace('Z', ''), self.timesolts))
self.timesolts = np.array(self.timesolts, dtype='datetime64[ns]')
for idx, _ts in enumerate(self.timesolts):
self.idx_of_timesolts[_ts] = idx
# 转6-d数组
feature_dim = len(gridodfile.columns) - 5
df = gridodfile[gridodfile.columns[-feature_dim:]]
len_time = len(self.timesolts)
data = np.zeros((self.len_row, self.len_column, self.len_row, self.len_column, len_time, feature_dim))
for oi in range(self.len_row):
for oj in range(self.len_column):
origin_index = (oi * self.len_column + oj) * len_time * len(self.geo_ids) # 每个起点占据len_t*n行
for di in range(self.len_row):
for dj in range(self.len_column):
destination_index = (di * self.len_column + dj) * len_time # 每个终点占据len_t行
index = origin_index + destination_index
# print(index, index + len_time)
data[oi][oj][di][dj] = df[index:index + len_time].values
data = data.transpose((4, 0, 1, 2, 3, 5)) # (len_time, len_row, len_column, len_row, len_column, feature_dim)
self._logger.info("Loaded file " + filename + '.gridod' + ', shape=' + str(data.shape))
return data
[docs] def _load_ext(self):
"""
加载.ext文件,格式[ext_id, time, properties(若干列)],
其中全局参数`ext_col`用于指定需要加载的数据的列,不设置则默认全部加载
Returns:
np.ndarray: 外部数据数组,shape: (timeslots, ext_dim)
"""
# 加载数据集
extfile = pd.read_csv(self.data_path + self.ext_file + '.ext')
if self.ext_col != '': # 根据指定的列加载数据集
if isinstance(self.ext_col, list):
ext_col = self.ext_col.copy()
else: # str
ext_col = [self.ext_col].copy()
ext_col.insert(0, 'time')
extfile = extfile[ext_col]
else: # 不指定则加载所有列
extfile = extfile[extfile.columns[1:]] # 从time列开始所有列
# 求时间序列
self.ext_timesolts = extfile['time']
self.idx_of_ext_timesolts = dict()
if not extfile['time'].isna().any(): # 时间没有空值
self.ext_timesolts = list(map(lambda x: x.replace('T', ' ').replace('Z', ''), self.ext_timesolts))
self.ext_timesolts = np.array(self.ext_timesolts, dtype='datetime64[ns]')
for idx, _ts in enumerate(self.ext_timesolts):
self.idx_of_ext_timesolts[_ts] = idx
# 求外部特征数组
feature_dim = len(extfile.columns) - 1
df = extfile[extfile.columns[-feature_dim:]].values
self._logger.info("Loaded file " + self.ext_file + '.ext' + ', shape=' + str(df.shape))
return df
[docs] def _generate_data(self):
"""
加载数据文件(.dyna/.grid/.od/.gridod)和外部数据(.ext),且将二者融合,以X,y的形式返回
Returns:
tuple: tuple contains:
x(np.ndarray): 模型输入数据,(num_samples, input_length, ..., feature_dim) \n
y(np.ndarray): 模型输出数据,(num_samples, output_length, ..., feature_dim)
"""
# 处理多数据文件问题
if isinstance(self.data_files, list):
data_files = self.data_files.copy()
else: # str
data_files = [self.data_files].copy()
# 加载外部数据
if self.load_external and os.path.exists(self.data_path + self.ext_file + '.ext'): # 外部数据集
ext_data = self._load_ext()
else:
ext_data = None
x_list, y_list = [], []
for filename in data_files:
df = self._load_dyna(filename) # (len_time, ..., feature_dim)
if self.load_external:
df = self._add_external_information(df, ext_data)
x, y = self._generate_input_data(df)
# x: (num_samples, input_length, ..., input_dim)
# y: (num_samples, output_length, ..., output_dim)
x_list.append(x)
y_list.append(y)
x = np.concatenate(x_list)
y = np.concatenate(y_list)
self._logger.info("Dataset created")
self._logger.info("x shape: " + str(x.shape) + ", y shape: " + str(y.shape))
return x, y
[docs] def _split_train_val_test(self, x, y):
"""
划分训练集、测试集、验证集,并缓存数据集
Args:
x(np.ndarray): 输入数据 (num_samples, input_length, ..., feature_dim)
y(np.ndarray): 输出数据 (num_samples, input_length, ..., feature_dim)
Returns:
tuple: tuple contains:
x_train: (num_samples, input_length, ..., feature_dim) \n
y_train: (num_samples, input_length, ..., feature_dim) \n
x_val: (num_samples, input_length, ..., feature_dim) \n
y_val: (num_samples, input_length, ..., feature_dim) \n
x_test: (num_samples, input_length, ..., feature_dim) \n
y_test: (num_samples, input_length, ..., feature_dim)
"""
test_rate = 1 - self.train_rate - self.eval_rate
num_samples = x.shape[0]
num_test = round(num_samples * test_rate)
num_train = round(num_samples * self.train_rate)
num_val = num_samples - num_test - num_train
# train
x_train, y_train = x[:num_train], y[:num_train]
# val
x_val, y_val = x[num_train: num_train + num_val], y[num_train: num_train + num_val]
# test
x_test, y_test = x[-num_test:], y[-num_test:]
self._logger.info("train\t" + "x: " + str(x_train.shape) + ", y: " + str(y_train.shape))
self._logger.info("eval\t" + "x: " + str(x_val.shape) + ", y: " + str(y_val.shape))
self._logger.info("test\t" + "x: " + str(x_test.shape) + ", y: " + str(y_test.shape))
if self.cache_dataset:
ensure_dir(self.cache_file_folder)
np.savez_compressed(
self.cache_file_name,
x_train=x_train,
y_train=y_train,
x_test=x_test,
y_test=y_test,
x_val=x_val,
y_val=y_val,
)
self._logger.info('Saved at ' + self.cache_file_name)
return x_train, y_train, x_val, y_val, x_test, y_test
[docs] def _generate_train_val_test(self):
"""
加载数据集,并划分训练集、测试集、验证集,并缓存数据集
Returns:
tuple: tuple contains:
x_train: (num_samples, input_length, ..., feature_dim) \n
y_train: (num_samples, input_length, ..., feature_dim) \n
x_val: (num_samples, input_length, ..., feature_dim) \n
y_val: (num_samples, input_length, ..., feature_dim) \n
x_test: (num_samples, input_length, ..., feature_dim) \n
y_test: (num_samples, input_length, ..., feature_dim)
"""
x, y = self._generate_data()
return self._split_train_val_test(x, y)
[docs] def _load_cache_train_val_test(self):
"""
加载之前缓存好的训练集、测试集、验证集
Returns:
tuple: tuple contains:
x_train: (num_samples, input_length, ..., feature_dim) \n
y_train: (num_samples, input_length, ..., feature_dim) \n
x_val: (num_samples, input_length, ..., feature_dim) \n
y_val: (num_samples, input_length, ..., feature_dim) \n
x_test: (num_samples, input_length, ..., feature_dim) \n
y_test: (num_samples, input_length, ..., feature_dim)
"""
self._logger.info('Loading ' + self.cache_file_name)
cat_data = np.load(self.cache_file_name)
x_train = cat_data['x_train']
y_train = cat_data['y_train']
x_test = cat_data['x_test']
y_test = cat_data['y_test']
x_val = cat_data['x_val']
y_val = cat_data['y_val']
self._logger.info("train\t" + "x: " + str(x_train.shape) + ", y: " + str(y_train.shape))
self._logger.info("eval\t" + "x: " + str(x_val.shape) + ", y: " + str(y_val.shape))
self._logger.info("test\t" + "x: " + str(x_test.shape) + ", y: " + str(y_test.shape))
return x_train, y_train, x_val, y_val, x_test, y_test
[docs] def _get_scalar(self, scaler_type, x_train, y_train):
"""
根据全局参数`scaler_type`选择数据归一化方法
Args:
x_train: 训练数据X
y_train: 训练数据y
Returns:
Scaler: 归一化对象
"""
if scaler_type == "normal":
scaler = NormalScaler(maxx=max(x_train.max(), y_train.max()))
self._logger.info('NormalScaler max: ' + str(scaler.max))
elif scaler_type == "standard":
scaler = StandardScaler(mean=x_train.mean(), std=x_train.std())
self._logger.info('StandardScaler mean: ' + str(scaler.mean) + ', std: ' + str(scaler.std))
elif scaler_type == "minmax01":
scaler = MinMax01Scaler(
maxx=max(x_train.max(), y_train.max()), minn=min(x_train.min(), y_train.min()))
self._logger.info('MinMax01Scaler max: ' + str(scaler.max) + ', min: ' + str(scaler.min))
elif scaler_type == "minmax11":
scaler = MinMax11Scaler(
maxx=max(x_train.max(), y_train.max()), minn=min(x_train.min(), y_train.min()))
self._logger.info('MinMax11Scaler max: ' + str(scaler.max) + ', min: ' + str(scaler.min))
elif scaler_type == "log":
scaler = LogScaler()
self._logger.info('LogScaler')
elif scaler_type == "none":
scaler = NoneScaler()
self._logger.info('NoneScaler')
else:
raise ValueError('Scaler type error!')
return scaler
[docs] def get_data(self):
"""
返回数据的DataLoader,包括训练数据、测试数据、验证数据
Returns:
tuple: tuple contains:
train_dataloader: Dataloader composed of Batch (class) \n
eval_dataloader: Dataloader composed of Batch (class) \n
test_dataloader: Dataloader composed of Batch (class)
"""
# 加载数据集
x_train, y_train, x_val, y_val, x_test, y_test = [], [], [], [], [], []
if self.data is None:
self.data = {}
if self.cache_dataset and os.path.exists(self.cache_file_name):
x_train, y_train, x_val, y_val, x_test, y_test = self._load_cache_train_val_test()
else:
x_train, y_train, x_val, y_val, x_test, y_test = self._generate_train_val_test()
# 数据归一化
self.feature_dim = x_train.shape[-1]
self.ext_dim = self.feature_dim - self.output_dim
self.scaler = self._get_scalar(self.scaler_type,
x_train[..., :self.output_dim], y_train[..., :self.output_dim])
self.ext_scaler = self._get_scalar(self.ext_scaler_type,
x_train[..., self.output_dim:], y_train[..., self.output_dim:])
x_train[..., :self.output_dim] = self.scaler.transform(x_train[..., :self.output_dim])
y_train[..., :self.output_dim] = self.scaler.transform(y_train[..., :self.output_dim])
x_val[..., :self.output_dim] = self.scaler.transform(x_val[..., :self.output_dim])
y_val[..., :self.output_dim] = self.scaler.transform(y_val[..., :self.output_dim])
x_test[..., :self.output_dim] = self.scaler.transform(x_test[..., :self.output_dim])
y_test[..., :self.output_dim] = self.scaler.transform(y_test[..., :self.output_dim])
if self.normal_external:
x_train[..., self.output_dim:] = self.ext_scaler.transform(x_train[..., self.output_dim:])
y_train[..., self.output_dim:] = self.ext_scaler.transform(y_train[..., self.output_dim:])
x_val[..., self.output_dim:] = self.ext_scaler.transform(x_val[..., self.output_dim:])
y_val[..., self.output_dim:] = self.ext_scaler.transform(y_val[..., self.output_dim:])
x_test[..., self.output_dim:] = self.ext_scaler.transform(x_test[..., self.output_dim:])
y_test[..., self.output_dim:] = self.ext_scaler.transform(y_test[..., self.output_dim:])
# 把训练集的X和y聚合在一起成为list,测试集验证集同理
# x_train/y_train: (num_samples, input_length, ..., feature_dim)
# train_data(list): train_data[i]是一个元组,由x_train[i]和y_train[i]组成
train_data = list(zip(x_train, y_train))
eval_data = list(zip(x_val, y_val))
test_data = list(zip(x_test, y_test))
# 转Dataloader
self.train_dataloader, self.eval_dataloader, self.test_dataloader = \
generate_dataloader(train_data, eval_data, test_data, self.feature_name,
self.batch_size, self.num_workers, pad_with_last_sample=self.pad_with_last_sample)
self.num_batches = len(self.train_dataloader)
return self.train_dataloader, self.eval_dataloader, self.test_dataloader
[docs] def get_data_feature(self):
"""
返回数据集特征,子类必须实现这个函数,返回必要的特征
Returns:
dict: 包含数据集的相关特征的字典
"""
raise NotImplementedError('Please implement the function `get_data_feature()`.')