import os
import random
from logging import getLogger
import numpy as np
import pandas as pd
from libcity.data.dataset import AbstractDataset
from libcity.data.utils import generate_dataloader
from libcity.utils import ensure_dir
[docs]class Alias:
def __init__(self, prob):
"""
使用 alias 方法,生成指定定分布
Args:
prob: list 目标概率分布
"""
length = len(prob)
self.length = length
accept, alias = [0] * length, [0] * length
insufficient, exceed = [], []
prob_ = np.array(prob) * length
for i, prob in enumerate(prob_):
if prob < 1.0:
insufficient.append(i)
else:
exceed.append(i)
while insufficient and exceed:
small_idx, large_idx = insufficient.pop(), exceed.pop()
accept[small_idx] = prob_[small_idx]
alias[small_idx] = large_idx
prob_[large_idx] = prob_[large_idx] - (1 - prob_[small_idx])
if prob_[large_idx] < 1.0:
insufficient.append(large_idx)
else:
exceed.append(large_idx)
while exceed:
large_idx = exceed.pop()
accept[large_idx] = 1
while insufficient:
small_idx = insufficient.pop()
accept[small_idx] = 1
self.accept = accept
self.alias = alias
[docs] def sample(self):
idx = random.randint(0, self.length - 1)
if random.random() >= self.accept[idx]:
return self.alias[idx]
else:
return idx
[docs]class LINEDataset(AbstractDataset):
def __init__(self, config):
# 数据集参数
self.dataset = config.get('dataset')
self.negative_ratio = config.get('negative_ratio', 5) # 负采样数,对于大数据集,适合 2-5
self.batch_size = config.get('batch_size', 32)
self.times = config.get('times')
self.scaler = None
# 数据集比例
self.train_rate = config.get('train_rate', 0.7)
self.eval_rate = config.get('eval_rate', 0.1)
self.scaler_type = config.get('scaler', 'none')
# 缓存
self.cache_dataset = config.get('cache_dataset', True)
self.parameters_str = \
str(self.dataset) + '_' + str(self.train_rate) + '_' \
+ str(self.eval_rate) + '_' + str(self.scaler_type)
self.cache_file_name = os.path.join('./libcity/cache/dataset_cache/',
'road_rep_{}.npz'.format(self.parameters_str))
self.cache_file_folder = './libcity/cache/dataset_cache/'
ensure_dir(self.cache_file_folder)
self.data_path = './raw_data/' + self.dataset + '/'
if not os.path.exists(self.data_path):
raise ValueError("Dataset {} not exist! Please ensure the path "
"'./raw_data/{}/' exist!".format(self.dataset, self.dataset))
# 读取原子文件
self.geo_file = config.get('geo_file', self.dataset)
self.rel_file = config.get('rel_file', self.dataset)
# 框架相关
self._logger = getLogger()
self.feature_name = {'I': 'int', 'J': 'int', 'Neg': 'int'}
self.num_workers = config.get('num_workers', 0)
self._load_geo()
self._load_rel()
# 采样条数
self.num_samples = self.num_edges * (1 + self.negative_ratio) * self.times
[docs] def _load_geo(self):
"""
加载.geo文件,格式[geo_id, type, coordinates, properties(若干列)]
"""
geofile = pd.read_csv(self.data_path + self.geo_file + '.geo')
self.geo_ids = list(geofile['geo_id'])
self.num_nodes = len(self.geo_ids)
self._geo_to_ind = {}
for index, idx in enumerate(self.geo_ids):
self._geo_to_ind[idx] = index
self._logger.info("Loaded file " + self.geo_file + '.geo' + ', num_nodes=' + str(self.num_nodes))
[docs] def _load_rel(self):
"""
加载.rel文件,格式[rel_id, type, origin_id, destination_id, properties(若干列)],
生成N*N的矩阵,默认.rel存在的边表示为1,不存在的边表示为0
Returns:
np.ndarray: self.adj_mx, N*N的邻接矩阵
"""
map_info = pd.read_csv(self.data_path + self.rel_file + '.rel')
if 'weight' in map_info.columns:
self.edges = [(self._geo_to_ind[e[0]], self._geo_to_ind[e[1]], e[2]) for e in
map_info[['origin_id', 'destination_id', 'weight']].values]
else:
self.edges = [(self._geo_to_ind[e[0]], self._geo_to_ind[e[1]], 1) for e in
map_info[['origin_id', 'destination_id']].values]
self.num_edges = len(self.edges)
self._logger.info("Loaded file " + self.rel_file + '.rel' + ', num_edges=' + str(self.num_edges))
def _gen_sampling_table(self, POW=0.75):
node_degree = np.zeros(self.num_nodes)
for edge in self.edges:
node_degree[edge[0]] += edge[2]
# 节点负采样所需 Alias 表
norm_prob = node_degree ** POW
norm_prob = node_degree / norm_prob.sum()
self.node_alias = Alias(norm_prob)
# 边采样所需 Alias 表
norm_prob = 0
for edge in self.edges:
norm_prob += edge[2]
norm_prob = [p[2] / norm_prob for p in self.edges]
self.edge_alias = Alias(norm_prob)
[docs] def _generate_data(self):
"""
LINE 采用的是按类似于 Skip-Gram 的训练方式,类似于 Word2Vec(Skip-Gram),将单词对类比成图中的一条边,
LINE 同时采用了两个优化,一个是对边按照正比于边权重的概率进行采样,另一个是类似于 Word2Vec 当中的负采样方法,
在采样一条边时,同时产生该边起始点到目标点(按正比于度^0.75的概率采样获得)的多个"负采样"边。
最后,为了通过 Python 的均匀分布随机数产生符合目标分布的采样,使用 O(1) 的 alias 采样方法
"""
# 加载数据集
self._load_geo()
self._load_rel()
# 生成采样数据
self._gen_sampling_table()
I = [] # 起始点
J = [] # 终止点
Neg = [] # 是否为负采样
pad_sample = self.num_samples % (1 + self.negative_ratio)
for _ in range(self.num_samples // (1 + self.negative_ratio)):
# 正样本
edge = self.edges[self.edge_alias.sample()]
I.append(edge[0])
J.append(edge[1])
Neg.append(1)
# 负样本
for _ in range(self.negative_ratio):
I.append(edge[0])
J.append(self.node_alias.sample())
Neg.append(-1)
# 填满 epoch
if pad_sample > 0:
edge = self.edges[self.edge_alias.sample()]
I.append(edge[0])
J.append(edge[1])
Neg.append(1)
pad_sample -= 1
if pad_sample > 0:
for _ in range(pad_sample):
I.append(edge[0])
J.append(self.node_alias.sample())
Neg.append(-1)
test_rate = 1 - self.train_rate - self.eval_rate
num_test = round(self.num_samples * test_rate)
num_train = round(self.num_samples * self.train_rate)
num_eval = self.num_samples - num_test - num_train
# train
I_train, J_train, Neg_train = I[:num_train], J[:num_train], Neg[:num_train]
# eval
I_eval, J_eval, Neg_eval = I[num_train:num_train + num_eval], J[num_train:num_train + num_eval], \
Neg[num_train:num_train + num_eval]
# test
I_test, J_test, Neg_test = I[-num_test:], J[-num_test:], Neg[-num_test:]
self._logger.info(
"train\tI: {}, J: {}, Neg: {}".format(str(len(I_train)), str(len(J_train)), str(len(Neg_train))))
self._logger.info(
"eval\tI: {}, J: {}, Neg: {}".format(str(len(I_eval)), str(len(J_eval)), str(len(Neg_eval))))
self._logger.info(
"test\tI: {}, J: {}, Neg: {}".format(str(len(I_test)), str(len(J_test)), str(len(Neg_test))))
if self.cache_dataset:
ensure_dir(self.cache_file_folder)
np.savez_compressed(
self.cache_file_name,
I_train=I_train,
J_train=J_train,
Neg_train=Neg_train,
I_test=I_test,
J_test=J_test,
Neg_test=Neg_test,
I_eval=I_eval,
J_eval=J_eval,
Neg_eval=Neg_eval
)
self._logger.info('Saved at ' + self.cache_file_name)
return I_train, J_train, Neg_train, I_eval, J_eval, Neg_eval, I_test, J_test, Neg_test
[docs] def _load_cache(self):
"""
加载之前缓存好的训练集、测试集、验证集
"""
self._logger.info('Loading ' + self.cache_file_name)
cat_data = np.load(self.cache_file_name)
I_train = cat_data['I_train']
J_train = cat_data['J_train']
Neg_train = cat_data['Neg_train']
I_test = cat_data['I_test']
J_test = cat_data['J_test']
Neg_test = cat_data['Neg_test']
I_eval = cat_data['I_eval']
J_eval = cat_data['J_eval']
Neg_eval = cat_data['Neg_eval']
self._logger.info(
"train\tI: {}, J: {}, Neg: {}".format(str(len(I_train)), str(len(J_train)), str(len(Neg_train))))
self._logger.info(
"eval\tI: {}, J: {}, Neg: {}".format(str(len(I_eval)), str(len(J_eval)), str(len(Neg_eval))))
self._logger.info(
"test\tI: {}, J: {}, Neg: {}".format(str(len(I_test)), str(len(J_test)), str(len(Neg_test))))
return I_train, J_train, Neg_train, I_eval, J_eval, Neg_eval, I_test, J_test, Neg_test
[docs] def get_data(self):
"""
返回数据的DataLoader,包括训练数据、测试数据、验证数据
Returns:
batch_data: dict
"""
# 加载数据集
if self.cache_dataset and os.path.exists(self.cache_file_name):
I_train, J_train, Neg_train, I_eval, J_eval, Neg_eval, I_test, J_test, Neg_test = self._load_cache()
else:
I_train, J_train, Neg_train, I_eval, J_eval, Neg_eval, I_test, J_test, Neg_test = self._generate_data()
train_data = list(zip(I_train, J_train, Neg_train))
eval_data = list(zip(I_eval, J_eval, Neg_eval))
test_data = list(zip(I_test, J_test, Neg_test))
self.train_dataloader, self.eval_dataloader, self.test_dataloader = \
generate_dataloader(train_data, eval_data, test_data, self.feature_name, self.batch_size, self.num_workers)
return self.train_dataloader, self.eval_dataloader, self.test_dataloader
[docs] def get_data_feature(self):
"""
返回一个 dict,包含数据集的相关特征
Returns:
dict: 包含数据集的相关特征的字典
"""
return {"scaler": self.scaler, "num_edges": self.num_edges,
"num_nodes": self.num_nodes}