Source code for libcity.executor.eta_executor

import os
import time
import json
import numpy as np
import torch
from libcity.executor.traffic_state_executor import TrafficStateExecutor
from libcity.model import loss
from functools import partial


[docs]class ETAExecutor(TrafficStateExecutor): def __init__(self, config, model, data_feature): super().__init__(config, model, data_feature) self.output_pred = config.get("output_pred", True) self.output_dim = None self._scalar = None
[docs] def _build_train_loss(self): """ 根据全局参数`train_loss`选择训练过程的loss函数 如果该参数为none,则需要使用模型自定义的loss函数 注意,loss函数应该接收`Batch`对象作为输入,返回对应的loss(torch.tensor) """ if self.train_loss.lower() == 'none': self._logger.warning('Received none train loss func and will use the loss func defined in the model.') return None if self.train_loss.lower() not in ['mae', 'mse', 'rmse', 'mape', 'logcosh', 'huber', 'quantile', 'masked_mae', 'masked_mse', 'masked_rmse', 'masked_mape', 'r2', 'evar']: self._logger.warning('Received unrecognized train loss function, set default mae loss func.') else: self._logger.info('You select `{}` as train loss function.'.format(self.train_loss.lower())) def func(batch): y_true = batch['time'] if y_true.dim() == 1: y_true = y_true.view(-1, 1) y_predicted = self.model.predict(batch) if self.train_loss.lower() == 'mae': lf = loss.masked_mae_torch elif self.train_loss.lower() == 'mse': lf = loss.masked_mse_torch elif self.train_loss.lower() == 'rmse': lf = loss.masked_rmse_torch elif self.train_loss.lower() == 'mape': lf = loss.masked_mape_torch elif self.train_loss.lower() == 'logcosh': lf = loss.log_cosh_loss elif self.train_loss.lower() == 'huber': lf = loss.huber_loss elif self.train_loss.lower() == 'quantile': lf = loss.quantile_loss elif self.train_loss.lower() == 'masked_mae': lf = partial(loss.masked_mae_torch, null_val=0) elif self.train_loss.lower() == 'masked_mse': lf = partial(loss.masked_mse_torch, null_val=0) elif self.train_loss.lower() == 'masked_rmse': lf = partial(loss.masked_rmse_torch, null_val=0) elif self.train_loss.lower() == 'masked_mape': lf = partial(loss.masked_mape_torch, null_val=0) elif self.train_loss.lower() == 'r2': lf = loss.r2_score_torch elif self.train_loss.lower() == 'evar': lf = loss.explained_variance_score_torch else: lf = loss.masked_mae_torch return lf(y_predicted, y_true) return func
[docs] def evaluate(self, test_dataloader): """ use model to test data Args: test_dataloader(torch.Dataloader): Dataloader """ self._logger.info('Start evaluating ...') with torch.no_grad(): self.model.eval() y_truths = [] y_preds = [] test_pred = {} for batch in test_dataloader: batch.to_tensor(self.device) output = self.model.predict(batch) y_true = batch['time'] y_pred = output y_truths.append(y_true.cpu().numpy()) y_preds.append(y_pred.cpu().numpy()) if self.output_pred: for i in range(y_pred.shape[0]): uid = batch['uid'][i].cpu().long().numpy()[0] if uid not in test_pred: test_pred[str(uid)] = {} traj_id = batch['traj_id'][i].cpu().long().numpy()[0] current_longi = batch['current_longi'][i].cpu().numpy() current_lati = batch['current_lati'][i].cpu().numpy() coordinates = [] for longi, lati in zip(current_longi, current_lati): coordinates.append((float(longi), float(lati))) traj_len = batch['traj_len'][i].cpu().long().numpy()[0] start_timestamp = batch['start_timestamp'][i].cpu().long().numpy()[0] outputs = {} outputs['coordinates'] = coordinates[:traj_len] outputs['start_time'] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.localtime(start_timestamp)) outputs['truth'] = float(y_true[i].cpu().numpy()[0]) outputs['prediction'] = float(y_pred[i].cpu().numpy()[0]) test_pred[str(uid)][str(traj_id)] = outputs y_preds = np.concatenate(y_preds, axis=0) y_truths = np.concatenate(y_truths, axis=0) # concatenate on batch if self.output_pred: filename = \ time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime(time.time())) + '_' \ + self.config['model'] + '_' + self.config['dataset'] + '_predictions.json' with open(os.path.join(self.evaluate_res_dir, filename), 'w') as f: json.dump(test_pred, f) self.evaluator.clear() self.evaluator.collect({'y_true': torch.tensor(y_truths), 'y_pred': torch.tensor(y_preds)}) test_result = self.evaluator.save_result(self.evaluate_res_dir) return test_result