Source code for libcity.model.trajectory_loc_prediction.RNN

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils.rnn import pack_padded_sequence
from torch.nn.utils.rnn import pad_packed_sequence

from libcity.model.abstract_model import AbstractModel


[docs]class RNN(AbstractModel): def __init__(self, config, data_feature): super(RNN, self).__init__(config, data_feature) self.loc_size = data_feature['loc_size'] self.loc_emb_size = config['loc_emb_size'] self.tim_size = data_feature['tim_size'] self.tim_emb_size = config['tim_emb_size'] self.hidden_size = config['hidden_size'] self.device = config['device'] self.rnn_type = config['rnn_type'] self.evaluate_method = config['evaluate_method'] self.emb_loc = nn.Embedding( self.loc_size, self.loc_emb_size, padding_idx=data_feature['loc_pad']) self.emb_tim = nn.Embedding( self.tim_size, self.tim_emb_size, padding_idx=data_feature['tim_pad']) input_size = self.loc_emb_size + self.tim_emb_size if self.rnn_type == 'GRU': self.rnn = nn.GRU(input_size, self.hidden_size, 1) elif self.rnn_type == 'LSTM': self.rnn = nn.LSTM(input_size, self.hidden_size, 1) elif self.rnn_type == 'RNN': self.rnn = nn.RNN(input_size, self.hidden_size, 1) self.init_weights() self.fc = nn.Linear(self.hidden_size, self.loc_size) self.dropout = nn.Dropout(p=config['dropout_p'])
[docs] def init_weights(self): """ Here we reproduce Keras default initialization weights for consistency with Keras version """ ih = (param.data for name, param in self.named_parameters() if 'weight_ih' in name) hh = (param.data for name, param in self.named_parameters() if 'weight_hh' in name) b = (param.data for name, param in self.named_parameters() if 'bias' in name) for t in ih: nn.init.xavier_uniform_(t) for t in hh: nn.init.orthogonal_(t) for t in b: nn.init.constant_(t, 0)
[docs] def forward(self, batch): loc = batch['current_loc'] tim = batch['current_tim'] loc_len = batch.get_origin_len('current_loc') batch_size = loc.shape[0] h1 = torch.zeros(1, batch_size, self.hidden_size).to(self.device) c1 = torch.zeros(1, batch_size, self.hidden_size).to(self.device) loc_emb = self.emb_loc(loc) tim_emb = self.emb_tim(tim) # change batch * seq * input_size to seq * batch * input_size x = torch.cat((loc_emb, tim_emb), 2).permute(1, 0, 2) x = self.dropout(x) # pack x and history_x pack_x = pack_padded_sequence(x, lengths=loc_len, enforce_sorted=False) if self.rnn_type == 'GRU' or self.rnn_type == 'RNN': out, h1 = self.rnn(pack_x, h1) elif self.rnn_type == 'LSTM': out, (h1, c1) = self.rnn(pack_x, (h1, c1)) # out = out.squeeze(1) out, out_len = pad_packed_sequence(out, batch_first=True) # out = out.permute(1, 0, 2) origin_len = batch.get_origin_len('current_loc') final_out_index = torch.tensor(origin_len) - 1 final_out_index = final_out_index.reshape(final_out_index.shape[0], 1, -1) final_out_index = final_out_index.repeat(1, 1, self.hidden_size).to(self.device) out = torch.gather(out, 1, final_out_index).squeeze(1) # batch_size * hidden_size out = F.selu(out) out = self.dropout(out) y = self.fc(out) score = F.log_softmax(y, dim=1) # calculate loss by NLLoss return score
[docs] def predict(self, batch): score = self.forward(batch) if self.evaluate_method == 'sample': # build pos_neg_inedx pos_neg_index = torch.cat((batch['target'].unsqueeze(1), batch['neg_loc']), dim=1) score = torch.gather(score, 1, pos_neg_index) return score
[docs] def calculate_loss(self, batch): criterion = nn.NLLLoss().to(self.device) scores = self.forward(batch) return criterion(scores, batch['target'])