Source code for libcity.pipeline.pipeline

import os
from ray import tune
from ray.tune.suggest.hyperopt import HyperOptSearch
from ray.tune.suggest.bayesopt import BayesOptSearch
from ray.tune.suggest.basic_variant import BasicVariantGenerator
from ray.tune.schedulers import FIFOScheduler, ASHAScheduler, MedianStoppingRule
from ray.tune.suggest import ConcurrencyLimiter
import json
import torch
import random
from libcity.config import ConfigParser
from libcity.data import get_dataset
from libcity.utils import get_executor, get_model, get_logger, ensure_dir, set_random_seed


[docs]def run_model(task=None, model_name=None, dataset_name=None, config_file=None, saved_model=True, train=True, other_args=None): """ Args: task(str): task name model_name(str): model name dataset_name(str): dataset name config_file(str): config filename used to modify the pipeline's settings. the config file should be json. saved_model(bool): whether to save the model train(bool): whether to train the model other_args(dict): the rest parameter args, which will be pass to the Config """ # load config config = ConfigParser(task, model_name, dataset_name, config_file, saved_model, train, other_args) exp_id = config.get('exp_id', None) if exp_id is None: # Make a new experiment ID exp_id = int(random.SystemRandom().random() * 100000) config['exp_id'] = exp_id # logger logger = get_logger(config) logger.info('Begin pipeline, task={}, model_name={}, dataset_name={}, exp_id={}'. format(str(task), str(model_name), str(dataset_name), str(exp_id))) logger.info(config.config) # seed seed = config.get('seed', 0) set_random_seed(seed) # 加载数据集 dataset = get_dataset(config) # 转换数据,并划分数据集 train_data, valid_data, test_data = dataset.get_data() data_feature = dataset.get_data_feature() # 加载执行器 model_cache_file = './libcity/cache/{}/model_cache/{}_{}.m'.format( exp_id, model_name, dataset_name) model = get_model(config, data_feature) executor = get_executor(config, model, data_feature) # 训练 if train or not os.path.exists(model_cache_file): executor.train(train_data, valid_data) if saved_model: executor.save_model(model_cache_file) else: executor.load_model(model_cache_file) # 评估,评估结果将会放在 cache/evaluate_cache 下 executor.evaluate(test_data)
[docs]def parse_search_space(space_file): search_space = {} if os.path.exists('./{}.json'.format(space_file)): with open('./{}.json'.format(space_file), 'r') as f: paras_dict = json.load(f) for name in paras_dict: paras_type = paras_dict[name]['type'] if paras_type == 'uniform': # name type low up try: search_space[name] = tune.uniform(paras_dict[name]['lower'], paras_dict[name]['upper']) except: raise TypeError('The space file does not meet the format requirements,\ when parsing uniform type.') elif paras_type == 'randn': # name type mean sd try: search_space[name] = tune.randn(paras_dict[name]['mean'], paras_dict[name]['sd']) except: raise TypeError('The space file does not meet the format requirements,\ when parsing randn type.') elif paras_type == 'randint': # name type lower upper try: if 'lower' not in paras_dict[name]: search_space[name] = tune.randint(paras_dict[name]['upper']) else: search_space[name] = tune.randint(paras_dict[name]['lower'], paras_dict[name]['upper']) except: raise TypeError('The space file does not meet the format requirements,\ when parsing randint type.') elif paras_type == 'choice': # name type list try: search_space[name] = tune.choice(paras_dict[name]['list']) except: raise TypeError('The space file does not meet the format requirements,\ when parsing choice type.') elif paras_type == 'grid_search': # name type list try: search_space[name] = tune.grid_search(paras_dict[name]['list']) except: raise TypeError('The space file does not meet the format requirements,\ when parsing grid_search type.') else: raise TypeError('The space file does not meet the format requirements,\ when parsing an undefined type.') else: raise FileNotFoundError('The space file {}.json is not found. Please ensure \ the config file is in the root dir and is a txt.'.format(space_file)) return search_space
[docs]def hyper_parameter(task=None, model_name=None, dataset_name=None, config_file=None, space_file=None, scheduler=None, search_alg=None, other_args=None, num_samples=5, max_concurrent=1, cpu_per_trial=1, gpu_per_trial=1): """ Use Ray tune to hyper parameter tune Args: task(str): task name model_name(str): model name dataset_name(str): dataset name config_file(str): config filename used to modify the pipeline's settings. the config file should be json. space_file(str): the file which specifies the parameter search space scheduler(str): the trial sheduler which will be used in ray.tune.run search_alg(str): the search algorithm other_args(dict): the rest parameter args, which will be pass to the Config """ # load config experiment_config = ConfigParser(task, model_name, dataset_name, config_file=config_file, other_args=other_args) # logger logger = get_logger(experiment_config) logger.info(experiment_config.config) # check space_file if space_file is None: logger.error('the space_file should not be None when hyperparameter tune.') exit(0) # seed seed = experiment_config.get('seed', 0) set_random_seed(seed) # parse space_file search_sapce = parse_search_space(space_file) # load dataset dataset = get_dataset(experiment_config) # get train valid test data train_data, valid_data, test_data = dataset.get_data() data_feature = dataset.get_data_feature() def train(config, checkpoint_dir=None, experiment_config=None, train_data=None, valid_data=None, data_feature=None): """trainable function which meets ray tune API Args: config (dict): A dict of hyperparameter. """ # modify experiment_config for key in config: if key in experiment_config: experiment_config[key] = config[key] experiment_config['hyper_tune'] = True logger = get_logger(experiment_config) logger.info('Begin pipeline, task={}, model_name={}, dataset_name={}' .format(str(task), str(model_name), str(dataset_name))) logger.info('running parameters: ' + str(config)) # load model model = get_model(experiment_config, data_feature) # load executor executor = get_executor(experiment_config, model, data_feature) # checkpoint by ray tune if checkpoint_dir: checkpoint = os.path.join(checkpoint_dir, 'checkpoint') executor.load_model(checkpoint) # train executor.train(train_data, valid_data) # init search algorithm and scheduler if search_alg == 'BasicSearch': algorithm = BasicVariantGenerator() elif search_alg == 'BayesOptSearch': algorithm = BayesOptSearch(metric='loss', mode='min') # add concurrency limit algorithm = ConcurrencyLimiter(algorithm, max_concurrent=max_concurrent) elif search_alg == 'HyperOpt': algorithm = HyperOptSearch(metric='loss', mode='min') # add concurrency limit algorithm = ConcurrencyLimiter(algorithm, max_concurrent=max_concurrent) else: raise ValueError('the search_alg is illegal.') if scheduler == 'FIFO': tune_scheduler = FIFOScheduler() elif scheduler == 'ASHA': tune_scheduler = ASHAScheduler() elif scheduler == 'MedianStoppingRule': tune_scheduler = MedianStoppingRule() else: raise ValueError('the scheduler is illegal') # ray tune run ensure_dir('./libcity/cache/hyper_tune') result = tune.run(tune.with_parameters(train, experiment_config=experiment_config, train_data=train_data, valid_data=valid_data, data_feature=data_feature), resources_per_trial={'cpu': cpu_per_trial, 'gpu': gpu_per_trial}, config=search_sapce, metric='loss', mode='min', scheduler=tune_scheduler, search_alg=algorithm, local_dir='./libcity/cache/hyper_tune', num_samples=num_samples) best_trial = result.get_best_trial("loss", "min", "last") logger.info("Best trial config: {}".format(best_trial.config)) logger.info("Best trial final validation loss: {}".format(best_trial.last_result["loss"])) # save best best_path = os.path.join(best_trial.checkpoint.value, "checkpoint") model_state, optimizer_state = torch.load(best_path) model_cache_file = './libcity/cache/model_cache/{}_{}.m'.format( model_name, dataset_name) ensure_dir('./libcity/cache/model_cache') torch.save((model_state, optimizer_state), model_cache_file)
[docs]def objective_function(task=None, model_name=None, dataset_name=None, config_file=None, saved_model=True, train=True, other_args=None, hyper_config_dict=None): config = ConfigParser(task, model_name, dataset_name, config_file, saved_model, train, other_args, hyper_config_dict) dataset = get_dataset(config) train_data, valid_data, test_data = dataset.get_data() data_feature = dataset.get_data_feature() model = get_model(config, data_feature) executor = get_executor(config, model, data_feature) best_valid_score = executor.train(train_data, valid_data) test_result = executor.evaluate(test_data) return { 'best_valid_score': best_valid_score, 'test_result': test_result }