Source code for libcity.model.traffic_flow_prediction.ToGCN

from libcity.model.abstract_traffic_state_model import AbstractTrafficStateModel
import torch
import torch.nn as nn
import torch.nn.functional as F
from libcity.model import loss
import math
import random

dtype = torch.float


[docs]class GraphConvolution(nn.Module): def __init__(self, input_size, output_size, device): super(GraphConvolution, self).__init__() self.input_size = input_size self.output_size = output_size self.weight = nn.Parameter(torch.zeros((input_size, output_size), device=device, dtype=dtype), requires_grad=True) self.bias = nn.Parameter(torch.zeros(output_size, device=device, dtype=dtype), requires_grad=True) self.init_parameters()
[docs] def init_parameters(self): stdv = 1. / math.sqrt(self.weight.size(1)) self.weight.data.uniform_(-stdv, stdv) self.bias.data.uniform_(-stdv, stdv)
[docs] def forward(self, x, A): x = torch.einsum("ijk, kl->ijl", [x, self.weight]) x = torch.einsum("ij, kjl->kil", [A, x]) x = x + self.bias return x
[docs]class GCN(nn.Module): def __init__(self, input_size, hidden_size, output_size, device): super(GCN, self).__init__() self.gcn1 = GraphConvolution(input_size, hidden_size, device) self.gcn2 = GraphConvolution(hidden_size, output_size, device) # self.gcn = GraphConvolution(input_size, output_size)
[docs] def forward(self, x, A): x = self.gcn1(x, A) x = F.relu(x) # x = F.dropout(x, training=self.training) x = self.gcn2(x, A) x = F.relu(x) # x = self.gcn(x, A) return x
[docs]class Encoder(nn.Module): def __init__(self, input_size, feature_size, hidden_size, device): super(Encoder, self).__init__() self.input_size = input_size self.hidden_size = hidden_size self.gcn = GCN(input_size=feature_size, hidden_size=128, output_size=1, device=device) self.lstm = nn.LSTM( input_size=self.input_size, hidden_size=self.hidden_size, num_layers=2, batch_first=True, dropout=0.5 ) self.device = device
[docs] def forward(self, x, A, hidden=None): # print('encoder_in:', x.shape) # batch_size, timestep, N = x.size() # gcn_in = x.view((batch_size * timestep, -1)) # gcn_out = self.gcn(gcn_in, A) # encoder_in = gcn_out.view((batch_size, timestep, -1)) x = x.view((x.size(0), x.size(1), -1)) x = self.gcn(x, A) encoder_in = x.reshape((x.size(0), 1, -1)) encoder_out, encoder_states = self.lstm(encoder_in, hidden) return encoder_out, encoder_states
[docs] def init_hidden(self, x): return torch.zeros((2, x.size(0), self.hidden_size), device=self.device, dtype=dtype)
[docs]class Decoder(nn.Module): def __init__(self, input_size, hidden_size, output_size, device): super(Decoder, self).__init__() self.input_size = input_size self.hidden_size = hidden_size self.output_size = output_size self.lstm = nn.LSTM( input_size=self.input_size, hidden_size=self.hidden_size, num_layers=2, batch_first=True, dropout=0.5 ) self.dense = nn.Linear(self.hidden_size, self.output_size) self.device = device
[docs] def forward(self, x, hidden=None): x = x.view(x.size(0), 1, -1) x, decoder_states = self.lstm(x, hidden) x = x.view(x.size(0), -1) # x = F.relu(x) x = self.dense(x) decoder_out = F.relu(x) return decoder_out, decoder_states
[docs] def init_hidden(self, x): return torch.zeros((2, x.size(0), self.hidden_size), device=self.device, dtype=dtype)
[docs]class ToGCN(AbstractTrafficStateModel): def __init__(self, config, data_feature): super().__init__(config, data_feature) torch.autograd.set_detect_anomaly(True) # get data feature self.device = config.get('device', torch.device('cpu')) # print('self.device=', self.device) self.adj_mx = torch.tensor(self.data_feature.get('adj_mx'), device=self.device) # print('self.adj_mx=', self.adj_mx) self.num_nodes = self.data_feature.get('num_nodes', 1) # print('self.num_nodes=', self.num_nodes) self.feature_dim = self.data_feature.get('feature_dim', 1) # print('self.feature_dim=', self.feature_dim) self.output_dim = self.data_feature.get('output_dim', 1) # print('self.output_dim=', self.output_dim) self._scaler = self.data_feature.get('scaler') # print('self._scaler', self._scaler) # get model config self.hidden_size = config.get('hidden_size', 128) # print('self.hidden_size=', self.hidden_size) self.decoder_t = config.get('decoder_t', 3) # print('self.decoder_t=', self.decoder_t) self.teacher_forcing_ratio = config.get('teacher_forcing_ratio', 0.5) # print('self.teacher_forcing_ratio=', self.teacher_forcing_ratio) # define the model structure self.encoder = Encoder(self.num_nodes, self.feature_dim, self.hidden_size, self.device) self.decoder = Decoder(self.num_nodes * self.output_dim, self.hidden_size, self.num_nodes * self.output_dim, self.device) self.linear = nn.Linear(in_features=self.feature_dim, out_features=self.output_dim)
[docs] def forward(self, batch): input_tensor = batch['X'] target_tensor = batch['y'] timestep_1 = input_tensor.shape[1] # Length of input time interval (10 min each) timestep_2 = target_tensor.shape[1] # Length of output time interval (10 min each) # Encode history flow map encoder_hidden = None for ei in range(timestep_1): encoder_input = input_tensor[:, ei] encoder_output, encoder_hidden = self.encoder(encoder_input, self.adj_mx, encoder_hidden) # Decode to predict future flow map decoder_hidden = encoder_hidden for di in range(self.decoder_t): decoder_input = self.linear(input_tensor[:, timestep_1 - (self.decoder_t - di) - 1].clone()) decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden) decoder_input = self.linear(input_tensor[:, timestep_1 - 1].clone()) # Teacher forcing mechanism. if random.random() < self.teacher_forcing_ratio: use_teacher_forcing = True else: use_teacher_forcing = False decoder_outputs = [] if use_teacher_forcing: for di in range(timestep_2): decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden) decoder_outputs.append(decoder_output) decoder_input = self.linear(target_tensor[:, di].clone()) else: for di in range(timestep_2): decoder_output, decoder_hidden = self.decoder(decoder_input, decoder_hidden) decoder_outputs.append(decoder_output) decoder_input = decoder_output y_preds = torch.stack(decoder_outputs, dim=1) # multi-step prediction y_preds = y_preds.unsqueeze(3) return y_preds
[docs] def predict(self, batch): return self.forward(batch)
[docs] def calculate_loss(self, batch): y_true = batch['y'] # ground-truth value y_predicted = self.predict(batch) # prediction results # denormalization the value y_true = self._scaler.inverse_transform(y_true[..., :self.output_dim]) y_predicted = self._scaler.inverse_transform(y_predicted[..., :self.output_dim]) # call the mask_mae loss function defined in `loss.py` return loss.masked_mae_torch(y_predicted, y_true, 0)