import os
import logging
from copy import deepcopy
import threading
import math
import ConfigSpace as CS
import yaml
import numpy as np
from federatedscope.core.auxiliaries.utils import setup_seed
from federatedscope.core.auxiliaries.data_builder import get_data
from federatedscope.core.auxiliaries.worker_builder import get_client_cls, \
get_server_cls
from federatedscope.core.auxiliaries.runner_builder import get_runner
from federatedscope.autotune.utils import parse_search_space, \
config2cmdargs, config2str, summarize_hpo_results, log2wandb, eval_in_fs
logger = logging.getLogger(__name__)
[docs]class TrialExecutor(threading.Thread):
"""This class is responsible for executing the FL procedure with
a given trial configuration in another thread.
"""
def __init__(self, cfg_idx, signal, returns, trial_config, client_cfgs):
threading.Thread.__init__(self)
self._idx = cfg_idx
self._signal = signal
self._returns = returns
self._trial_cfg = trial_config
self._client_cfgs = client_cfgs
[docs] def run(self):
setup_seed(self._trial_cfg.seed)
data, modified_config = get_data(config=self._trial_cfg.clone())
self._trial_cfg.merge_from_other_cfg(modified_config)
self._trial_cfg.freeze()
Fed_runner = get_runner(data=data,
server_class=get_server_cls(self._trial_cfg),
client_class=get_client_cls(self._trial_cfg),
config=self._trial_cfg.clone(),
client_configs=self._client_cfgs)
results = Fed_runner.run()
key1, key2 = self._trial_cfg.hpo.metric.split('.')
self._returns['perf'] = results[key1][key2]
self._returns['cfg_idx'] = self._idx
self._signal.set()
[docs]def get_scheduler(init_cfg, client_cfgs=None):
"""To instantiate a scheduler object for conducting HPO
Arguments:
init_cfg: configuration
client_cfgs: client-specific configuration
"""
if init_cfg.hpo.scheduler in [
'sha', 'rs', 'bo_kde', 'bohb', 'hb', 'bo_gp', 'bo_rf'
]:
scheduler = SuccessiveHalvingAlgo(init_cfg, client_cfgs)
# elif init_cfg.hpo.scheduler == 'pbt':
# scheduler = PBT(init_cfg)
elif init_cfg.hpo.scheduler.startswith('wrap', client_cfgs):
scheduler = SHAWrapFedex(init_cfg)
return scheduler
[docs]class Scheduler(object):
"""The base class for describing HPO algorithms
"""
def __init__(self, cfg, client_cfgs=None):
"""
Arguments:
cfg (federatedscope.core.configs.config.CN): dict \
like object, where each key-value pair corresponds to a \
field and its choices.
client_cfgs: client-specific configuration
"""
self._cfg = cfg
self._client_cfgs = client_cfgs
# Create hpo working folder
os.makedirs(self._cfg.hpo.working_folder, exist_ok=True)
self._search_space = parse_search_space(self._cfg.hpo.ss)
self._init_configs = self._setup()
logger.info(self._init_configs)
[docs] def _setup(self):
"""Prepare the initial configurations based on the search space.
"""
raise NotImplementedError
[docs] def _evaluate(self, configs):
"""To evaluate (i.e., conduct the FL procedure) for a given
collection of configurations.
"""
raise NotImplementedError
[docs] def optimize(self):
"""To optimize the hyperparameters, that is, executing the HPO
algorithm and then returning the results.
"""
raise NotImplementedError
[docs]class ModelFreeBase(Scheduler):
"""To attempt a collection of configurations exhaustively.
"""
[docs] def _setup(self):
self._search_space.seed(self._cfg.seed + 19)
return [
cfg.get_dictionary()
for cfg in self._search_space.sample_configuration(
size=self._cfg.hpo.init_cand_num)
]
[docs] def _evaluate(self, configs):
if self._cfg.hpo.num_workers:
# execute FL in parallel by multi-threading
flags = [
threading.Event() for _ in range(self._cfg.hpo.num_workers)
]
for i in range(len(flags)):
flags[i].set()
threads = [None for _ in range(len(flags))]
thread_results = [dict() for _ in range(len(flags))]
perfs = [None for _ in range(len(configs))]
for i, config in enumerate(configs):
available_worker = 0
while not flags[available_worker].is_set():
available_worker = (available_worker + 1) % len(threads)
if thread_results[available_worker]:
completed_trial_results = thread_results[available_worker]
cfg_idx = completed_trial_results['cfg_idx']
perfs[cfg_idx] = completed_trial_results['perf']
logger.info(
"Evaluate the {}-th config {} and get performance {}".
format(cfg_idx, configs[cfg_idx], perfs[cfg_idx]))
thread_results[available_worker].clear()
trial_cfg = self._cfg.clone()
trial_cfg.merge_from_list(config2cmdargs(config))
flags[available_worker].clear()
trial = TrialExecutor(i, flags[available_worker],
thread_results[available_worker],
trial_cfg, self._client_cfgs)
trial.start()
threads[available_worker] = trial
for i in range(len(flags)):
if not flags[i].is_set():
threads[i].join()
for i in range(len(thread_results)):
if thread_results[i]:
completed_trial_results = thread_results[i]
cfg_idx = completed_trial_results['cfg_idx']
perfs[cfg_idx] = completed_trial_results['perf']
# TODO: Support num_worker in WandB
logger.info(
"Evaluate the {}-th config {} and get performance {}".
format(cfg_idx, configs[cfg_idx], perfs[cfg_idx]))
thread_results[i].clear()
else:
tmp_configs = []
perfs = [None] * len(configs)
for i, config in enumerate(configs):
tmp_configs.append(config)
trial_cfg = self._cfg.clone()
trial_cfg.merge_from_list(config2cmdargs(config))
results = eval_in_fs(cfg=trial_cfg,
client_cfgs=self._client_cfgs)
key1, key2 = trial_cfg.hpo.metric.split('.')
perfs[i] = results[key1][key2]
logger.info(
"Evaluate the {}-th config {} and get performance {}".
format(i, config, perfs[i]))
if self._cfg.wandb.use:
tmp_results = \
summarize_hpo_results(tmp_configs,
perfs,
white_list=set(
self._search_space.keys()),
desc=self._cfg.hpo.larger_better,
is_sorted=False)
log2wandb(i, config, results, trial_cfg, tmp_results)
return perfs
[docs] def optimize(self):
perfs = self._evaluate(self._init_configs)
results = summarize_hpo_results(self._init_configs,
perfs,
white_list=set(
self._search_space.keys()),
desc=self._cfg.hpo.larger_better,
use_wandb=self._cfg.wandb.use)
logger.info(
"========================== HPO Final ==========================")
logger.info("\n{}".format(results))
results.to_csv(
os.path.join(self._cfg.hpo.working_folder, 'results.csv'))
logger.info("====================================================")
return results
[docs]class IterativeScheduler(ModelFreeBase):
"""The base class for HPO algorithms that divide the whole optimization
procedure into iterations.
"""
[docs] def _setup(self):
self._stage = 0
return super(IterativeScheduler, self)._setup()
[docs] def _stop_criterion(self, configs, last_results):
"""To determine whether the algorithm should be terminated.
Arguments:
configs (list): each element is a trial configuration.
last_results (DataFrame): each row corresponds to a specific
configuration as well as its latest performance.
:returns: whether to terminate.
:rtype: bool
"""
raise NotImplementedError
[docs] def _iteration(self, configs):
"""To evaluate the given collection of configurations at this stage.
Arguments:
configs (list): each element is a trial configuration.
:returns: the performances of the given configurations.
:rtype: list
"""
perfs = self._evaluate(configs)
return perfs
[docs] def _generate_next_population(self, configs, perfs):
"""To generate the configurations for the next stage.
Arguments:
configs (list): the configurations of last stage.
perfs (list): their corresponding performances.
:returns: configuration for the next stage.
:rtype: list
"""
raise NotImplementedError
[docs] def optimize(self):
current_configs = deepcopy(self._init_configs)
last_results = None
while not self._stop_criterion(current_configs, last_results):
current_perfs = self._iteration(current_configs)
last_results = summarize_hpo_results(
current_configs,
current_perfs,
white_list=set(self._search_space.keys()),
desc=self._cfg.hpo.larger_better,
use_wandb=self._cfg.wandb.use)
self._stage += 1
logger.info(
"========================== Stage{} =========================="
.format(self._stage))
logger.info("\n{}".format(last_results))
last_results.to_csv(
os.path.join(self._cfg.hpo.working_folder, 'results.csv'))
logger.info("====================================================")
current_configs = self._generate_next_population(
current_configs, current_perfs)
return current_configs
[docs]class SuccessiveHalvingAlgo(IterativeScheduler):
"""Successive Halving Algorithm (SHA) tailored to FL setting, where,
in each iteration, just a limited number of communication rounds are
allowed for each trial.
"""
[docs] def _setup(self):
init_configs = super(SuccessiveHalvingAlgo, self)._setup()
for trial_cfg in init_configs:
trial_cfg['federate.save_to'] = os.path.join(
self._cfg.hpo.working_folder,
"{}.pth".format(config2str(trial_cfg)))
if self._cfg.hpo.sha.budgets:
for trial_cfg in init_configs:
rnd = min(
self._cfg.hpo.sha.budgets[0] *
self._cfg.hpo.sha.elim_rate**self._stage,
self._cfg.hpo.sha.budgets[1])
trial_cfg['federate.total_round_num'] = rnd
trial_cfg['eval.freq'] = rnd
return init_configs
[docs] def _stop_criterion(self, configs, last_results):
return len(configs) <= 1
[docs] def _generate_next_population(self, configs, perfs):
indices = [(i, val) for i, val in enumerate(perfs)]
indices.sort(key=lambda x: x[1], reverse=self._cfg.hpo.larger_better)
next_population = [
configs[tp[0]] for tp in
indices[:math.
ceil(float(len(indices)) / self._cfg.hpo.sha.elim_rate)]
]
for trial_cfg in next_population:
if 'federate.restore_from' not in trial_cfg:
trial_cfg['federate.restore_from'] = trial_cfg[
'federate.save_to']
rnd = min(
self._cfg.hpo.sha.budgets[0] *
self._cfg.hpo.sha.elim_rate**self._stage,
self._cfg.hpo.sha.budgets[1])
if self._cfg.hpo.sha.budgets and rnd < \
self._cfg.hpo.sha.budgets[1]:
trial_cfg['federate.total_round_num'] = rnd
trial_cfg['eval.freq'] = rnd
return next_population
[docs]class SHAWrapFedex(SuccessiveHalvingAlgo):
"""This SHA is customized as a wrapper for FedEx algorithm."""
def _make_local_perturbation(self, config):
neighbor = dict()
for k in config:
if 'fedex' in k or 'fedopt' in k or k in [
'federate.save_to', 'federate.total_round_num', 'eval.freq'
]:
# a workaround
continue
hyper = self._search_space.get(k)
if isinstance(hyper, CS.UniformFloatHyperparameter):
lb, ub = hyper.lower, hyper.upper
diameter = self._cfg.hpo.table.eps * (ub - lb)
new_val = (config[k] -
0.5 * diameter) + np.random.uniform() * diameter
neighbor[k] = float(np.clip(new_val, lb, ub))
elif isinstance(hyper, CS.UniformIntegerHyperparameter):
lb, ub = hyper.lower, hyper.upper
diameter = self._cfg.hpo.table.eps * (ub - lb)
new_val = round(
float((config[k] - 0.5 * diameter) +
np.random.uniform() * diameter))
neighbor[k] = int(np.clip(new_val, lb, ub))
elif isinstance(hyper, CS.CategoricalHyperparameter):
if len(hyper.choices) == 1:
neighbor[k] = config[k]
else:
threshold = self._cfg.hpo.table.eps * len(
hyper.choices) / (len(hyper.choices) - 1)
rn = np.random.uniform()
new_val = np.random.choice(
hyper.choices) if rn <= threshold else config[k]
if type(new_val) in [np.int32, np.int64]:
neighbor[k] = int(new_val)
elif type(new_val) in [np.float32, np.float64]:
neighbor[k] = float(new_val)
else:
neighbor[k] = str(new_val)
else:
raise TypeError("Value of {} has an invalid type {}".format(
k, type(config[k])))
return neighbor
[docs] def _setup(self):
# self._cache_yaml()
init_configs = super(SHAWrapFedex, self)._setup()
new_init_configs = []
for idx, trial_cfg in enumerate(init_configs):
arms = dict(("arm{}".format(1 + j),
self._make_local_perturbation(trial_cfg))
for j in range(self._cfg.hpo.table.num - 1))
arms['arm0'] = dict(
(k, v) for k, v in trial_cfg.items() if k in arms['arm1'])
with open(
os.path.join(self._cfg.hpo.working_folder,
f'{idx}_tmp_grid_search_space.yaml'),
'w') as f:
yaml.dump(arms, f)
new_trial_cfg = dict()
for k in trial_cfg:
if k not in arms['arm0']:
new_trial_cfg[k] = trial_cfg[k]
new_trial_cfg['hpo.table.idx'] = idx
new_trial_cfg['hpo.fedex.ss'] = os.path.join(
self._cfg.hpo.working_folder,
f"{new_trial_cfg['hpo.table.idx']}_tmp_grid_search_space.yaml")
new_trial_cfg['federate.save_to'] = os.path.join(
self._cfg.hpo.working_folder, "idx_{}.pth".format(idx))
new_init_configs.append(new_trial_cfg)
self._search_space.add_hyperparameter(
CS.CategoricalHyperparameter("hpo.table.idx",
choices=list(
range(len(new_init_configs)))))
return new_init_configs
# TODO: refactor PBT to enable async parallel
# class PBT(IterativeScheduler):
# """Population-based training (the full paper "Population Based Training
# of Neural Networks" can be found at https://arxiv.org/abs/1711.09846)
# tailored to FL setting, where, in each iteration, just a limited number
# of communication rounds are allowed for each trial (We will provide the
# asynchornous version later).
# """
# def _setup(self, raw_search_space):
# _ = super(PBT, self)._setup(raw_search_space)
#
# if global_cfg.hpo.init_strategy == 'random':
# init_configs = random_search(
# raw_search_space,
# sample_size=global_cfg.hpo.sha.elim_rate**
# global_cfg.hpo.sha.elim_round_num)
# elif global_cfg.hpo.init_strategy == 'grid':
# init_configs = grid_search(raw_search_space, \
# sample_size=global_cfg.hpo.sha.elim_rate \
# **global_cfg.hpo.sha.elim_round_num)
# else:
# raise ValueError(
# "SHA needs to use random/grid search to pick {} configs
# from the search space as initial candidates, but `{}` is
# specified as `hpo.init_strategy`"
# .format(
# global_cfg.hpo.sha.elim_rate**
# global_cfg.hpo.sha.elim_round_num,
# global_cfg.hpo.init_strategy))
#
# for trial_cfg in init_configs:
# trial_cfg['federate.save_to'] = os.path.join(
# global_cfg.hpo.working_folder,
# "{}.pth".format(config2str(trial_cfg)))
#
# return init_configs
#
# def _stop_criterion(self, configs, last_results):
# if last_results is not None:
# if (global_cfg.hpo.larger_better
# and last_results.iloc[0]['performance'] >=
# global_cfg.hpo.pbt.perf_threshold) or (
# (not global_cfg.hpo.larger_better)
# and last_results.iloc[0]['performance'] <=
# global_cfg.hpo.pbt.perf_threshold):
# return True
# return self._stage >= global_cfg.hpo.pbt.max_stage
#
# def _generate_next_population(self, configs, perfs):
# next_generation = []
# for i in range(len(configs)):
# new_cfg = deepcopy(configs[i])
# # exploit
# j = np.random.randint(len(configs))
# if i != j and (
# (global_cfg.hpo.larger_better and perfs[j] > perfs[i]) or
# ((not global_cfg.hpo.larger_better) and perfs[j] < perfs[i])):
# new_cfg['federate.restore_from'] = configs[j][
# 'federate.save_to']
# # explore
# for k in new_cfg:
# if isinstance(new_cfg[k], float):
# # according to the exploration strategy of the PBT
# paper
# new_cfg[k] *= float(np.random.choice([0.8, 1.2]))
# else:
# new_cfg['federate.restore_from'] = configs[i][
# 'federate.save_to']
#
# # update save path
# tmp_cfg = dict()
# for k in new_cfg:
# if k in self._original_search_space:
# tmp_cfg[k] = new_cfg[k]
# new_cfg['federate.save_to'] = os.path.join(
# global_cfg.hpo.working_folder,
# "{}.pth".format(config2str(tmp_cfg)))
#
# next_generation.append(new_cfg)
#
# return next_generation