Source code for federatedscope.core.monitors.monitor
import copy
import json
import logging
import os
import gzip
import shutil
import datetime
from collections import defaultdict
from importlib import import_module
import numpy as np
from federatedscope.core.auxiliaries.logging import logline_2_wandb_dict
from federatedscope.core.monitors.metric_calculator import MetricCalculator
try:
import torch
except ImportError:
torch = None
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
global_all_monitors = [
] # used in standalone mode, to merge sys metric results for all workers
[docs]class Monitor(object):
"""
Provide the monitoring functionalities such as formatting the \
evaluation results into diverse metrics. \
Besides the prediction related performance, the monitor also can \
track efficiency related metrics for a worker
Args:
cfg: a cfg node object
monitored_object: object to be monitored
Attributes:
log_res_best: best ever seen results
outdir: output directory
use_wandb: whether use ``wandb``
wandb_online_track: whether use ``wandb`` to track online
monitored_object: object to be monitored
metric_calculator: metric calculator, /
see ``core.monitors.metric_calculator``
round_wise_update_key: key to decide which result of evaluation \
round is better
"""
SUPPORTED_FORMS = ['weighted_avg', 'avg', 'fairness', 'raw']
def __init__(self, cfg, monitored_object=None):
self.cfg = cfg
self.log_res_best = {}
self.outdir = cfg.outdir
self.use_wandb = cfg.wandb.use
self.wandb_online_track = cfg.wandb.online_track if cfg.wandb.use \
else False
# self.use_tensorboard = cfg.use_tensorboard
self.monitored_object = monitored_object
self.metric_calculator = MetricCalculator(cfg.eval.metrics)
# Obtain the whether the larger the better
self.round_wise_update_key = cfg.eval.best_res_update_round_wise_key
for mode in ['train', 'val', 'test']:
if mode in self.round_wise_update_key:
update_key = self.round_wise_update_key.split(f'{mode}_')[1]
assert update_key in self.metric_calculator.eval_metric, \
f'{update_key} not found in metrics.'
self.the_larger_the_better = self.metric_calculator.eval_metric[
update_key][1]
# ======= efficiency indicators of the worker to be monitored =======
# leveraged the flops counter provided by [fvcore](
# https://github.com/facebookresearch/fvcore)
self.total_model_size = 0 # model size used in the worker, in terms
# of number of parameters
self.flops_per_sample = 0 # average flops for forwarding each data
# sample
self.flop_count = 0 # used to calculated the running mean for
# flops_per_sample
self.total_flops = 0 # total computation flops to convergence until
# current fl round
self.total_upload_bytes = 0 # total upload space cost in bytes
# until current fl round
self.total_download_bytes = 0 # total download space cost in bytes
# until current fl round
self.fl_begin_wall_time = datetime.datetime.now()
self.fl_end_wall_time = 0
# for the metrics whose names includes "convergence", 0 indicates
# the worker does not converge yet
# Note:
# 1) the convergence wall time is prone to fluctuations due to
# possible resource competition during FL courses
# 2) the global/local indicates whether the early stopping triggered
# with global-aggregation/local-training
self.global_convergence_round = 0 # total fl rounds to convergence
self.global_convergence_wall_time = 0
self.local_convergence_round = 0 # total fl rounds to convergence
self.local_convergence_wall_time = 0
if self.wandb_online_track:
global_all_monitors.append(self)
if self.use_wandb:
try:
import wandb
except ImportError:
logger.error(
"cfg.wandb.use=True but not install the wandb package")
exit()
[docs] def eval(self, ctx):
"""
Evaluates the given context with ``metric_calculator``.
Args:
ctx: context of trainer, see ``core.trainers.context``
Returns:
Evaluation results.
"""
results = self.metric_calculator.eval(ctx)
return results
[docs] def global_converged(self):
"""
Calculate wall time and round when global convergence has been reached.
"""
self.global_convergence_wall_time = datetime.datetime.now(
) - self.fl_begin_wall_time
self.global_convergence_round = self.monitored_object.state
[docs] def local_converged(self):
"""
Calculate wall time and round when local convergence has been reached.
"""
self.local_convergence_wall_time = datetime.datetime.now(
) - self.fl_begin_wall_time
self.local_convergence_round = self.monitored_object.state
[docs] def finish_fl(self):
"""
When FL finished, write system metrics to file.
"""
self.fl_end_wall_time = datetime.datetime.now(
) - self.fl_begin_wall_time
system_metrics = self.get_sys_metrics()
sys_metric_f_name = os.path.join(self.outdir, "system_metrics.log")
with open(sys_metric_f_name, "a") as f:
f.write(json.dumps(system_metrics) + "\n")
def get_sys_metrics(self, verbose=True):
system_metrics = {
"id": self.monitored_object.ID,
"fl_end_time_minutes": self.fl_end_wall_time.total_seconds() /
60 if isinstance(self.fl_end_wall_time, datetime.timedelta) else 0,
"total_model_size": self.total_model_size,
"total_flops": self.total_flops,
"total_upload_bytes": self.total_upload_bytes,
"total_download_bytes": self.total_download_bytes,
"global_convergence_round": self.global_convergence_round,
"local_convergence_round": self.local_convergence_round,
"global_convergence_time_minutes": self.
global_convergence_wall_time.total_seconds() / 60 if isinstance(
self.global_convergence_wall_time, datetime.timedelta) else 0,
"local_convergence_time_minutes": self.local_convergence_wall_time.
total_seconds() / 60 if isinstance(
self.local_convergence_wall_time, datetime.timedelta) else 0,
}
if verbose:
logger.info(
f"In worker #{self.monitored_object.ID}, the system-related "
f"metrics are: {str(system_metrics)}")
return system_metrics
[docs] def merge_system_metrics_simulation_mode(self,
file_io=True,
from_global_monitors=False):
"""
Average the system metrics recorded in ``system_metrics.json`` by \
all workers
"""
all_sys_metrics = defaultdict(list)
avg_sys_metrics = defaultdict()
std_sys_metrics = defaultdict()
if file_io:
sys_metric_f_name = os.path.join(self.outdir, "system_metrics.log")
if not os.path.exists(sys_metric_f_name):
logger.warning(
"You have not tracked the workers' system metrics in "
"$outdir$/system_metrics.log, "
"we will skip the merging. Plz check whether you do not "
"want to call monitor.finish_fl()")
return
with open(sys_metric_f_name, "r") as f:
for line in f:
res = json.loads(line)
if all_sys_metrics is None:
all_sys_metrics = res
all_sys_metrics["id"] = "all"
else:
for k, v in res.items():
all_sys_metrics[k].append(v)
id_to_be_merged = all_sys_metrics["id"]
if len(id_to_be_merged) != len(set(id_to_be_merged)):
logger.warning(
f"The sys_metric_file ({sys_metric_f_name}) contains "
f"duplicated tracked sys-results with these ids: "
f"f{id_to_be_merged} "
f"We will skip the merging as the merge is invalid. "
f"Plz check whether you specify the 'outdir' "
f"as the same as the one of another older experiment.")
return
elif from_global_monitors:
for monitor in global_all_monitors:
res = monitor.get_sys_metrics(verbose=False)
if all_sys_metrics is None:
all_sys_metrics = res
all_sys_metrics["id"] = "all"
else:
for k, v in res.items():
all_sys_metrics[k].append(v)
else:
raise ValueError("file_io or from_monitors should be True: "
f"but got file_io={file_io}, from_monitors"
f"={from_global_monitors}")
for k, v in all_sys_metrics.items():
if k == "id":
avg_sys_metrics[k] = "sys_avg"
std_sys_metrics[k] = "sys_std"
else:
v = np.array(v).astype("float")
mean_res = np.mean(v)
std_res = np.std(v)
if "flops" in k or "bytes" in k or "size" in k:
mean_res = self.convert_size(mean_res)
std_res = self.convert_size(std_res)
avg_sys_metrics[f"sys_avg/{k}"] = mean_res
std_sys_metrics[f"sys_std/{k}"] = std_res
logger.info(
f"After merging the system metrics from all works, we got avg:"
f" {avg_sys_metrics}")
logger.info(
f"After merging the system metrics from all works, we got std:"
f" {std_sys_metrics}")
if file_io:
with open(sys_metric_f_name, "a") as f:
f.write(json.dumps(avg_sys_metrics) + "\n")
f.write(json.dumps(std_sys_metrics) + "\n")
if self.use_wandb and self.wandb_online_track:
try:
import wandb
# wandb.log(avg_sys_metrics)
# wandb.log(std_sys_metrics)
for k, v in avg_sys_metrics.items():
wandb.summary[k] = v
for k, v in std_sys_metrics.items():
wandb.summary[k] = v
except ImportError:
logger.error(
"cfg.wandb.use=True but not install the wandb package")
exit()
[docs] def save_formatted_results(self,
formatted_res,
save_file_name="eval_results.log"):
"""
Save formatted results to a file.
"""
line = str(formatted_res) + "\n"
if save_file_name != "":
with open(os.path.join(self.outdir, save_file_name),
"a") as outfile:
outfile.write(line)
if self.use_wandb and self.wandb_online_track:
try:
import wandb
exp_stop_normal = False
exp_stop_normal, log_res = logline_2_wandb_dict(
exp_stop_normal, line, self.log_res_best, raw_out=False)
wandb.log(log_res)
except ImportError:
logger.error(
"cfg.wandb.use=True but not install the wandb package")
exit()
[docs] def finish_fed_runner(self, fl_mode=None):
"""
Finish the Fed runner.
"""
self.compress_raw_res_file()
if fl_mode == "standalone":
self.merge_system_metrics_simulation_mode()
if self.use_wandb and not self.wandb_online_track:
try:
import wandb
except ImportError:
logger.error(
"cfg.wandb.use=True but not install the wandb package")
exit()
from federatedscope.core.auxiliaries.logging import \
logfile_2_wandb_dict
with open(os.path.join(self.outdir, "eval_results.log"),
"r") as exp_log_f:
# track the prediction related performance
all_log_res, exp_stop_normal, last_line, log_res_best = \
logfile_2_wandb_dict(exp_log_f, raw_out=False)
for log_res in all_log_res:
wandb.log(log_res)
wandb.log(log_res_best)
# track the system related performance
sys_metric_f_name = os.path.join(self.outdir,
"system_metrics.log")
with open(sys_metric_f_name, "r") as f:
for line in f:
res = json.loads(line)
if res["id"] in ["sys_avg", "sys_std"]:
# wandb.log(res)
for k, v in res.items():
wandb.summary[k] = v
[docs] def compress_raw_res_file(self):
"""
Compress the raw res file to be written to disk.
"""
old_f_name = os.path.join(self.outdir, "eval_results.raw")
if os.path.exists(old_f_name):
logger.info(
"We will compress the file eval_results.raw into a .gz file, "
"and delete the old one")
with open(old_f_name, 'rb') as f_in:
with gzip.open(old_f_name + ".gz", 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(old_f_name)
[docs] def format_eval_res(self,
results,
rnd,
role=-1,
forms=None,
return_raw=False):
"""
Format the evaluation results from ``trainer.ctx.eval_results``
Args:
results (dict): a dict to store the evaluation results {metric:
value}
rnd (int|string): FL round
role (int|string): the output role
forms (list): format type
return_raw (bool): return either raw results, or other results
Returns:
dict: round_formatted_results, a formatted results with \
different forms and roles
Note:
Example of return value:
```
{ \
'Role': 'Server #', \
'Round': 200, \
'Results_weighted_avg': { \
'test_avg_loss': 0.58, 'test_acc': 0.67, 'test_correct': \
3356, 'test_loss': 2892, 'test_total': 5000 \
}, \
'Results_avg': { \
'test_avg_loss': 0.57, 'test_acc': 0.67, 'test_correct': \
3356, 'test_loss': 2892, 'test_total': 5000 \
}, \
'Results_fairness': { \
'test_total': 33.99, 'test_correct': 27.185, \
'test_avg_loss_std': 0.433551, \
'test_avg_loss_bottom_decile': 0.356503, \
'test_avg_loss_top_decile': 1.212492, \
'test_avg_loss_min': 0.198317, 'test_avg_loss_max': 3.603567, \
'test_avg_loss_bottom10%': 0.276681, 'test_avg_loss_top10%': \
1.686649, \
'test_avg_loss_cos1': 0.8679, 'test_avg_loss_entropy': 5.1641, \
'test_loss_std': 13.686828, 'test_loss_bottom_decile': 11.8220, \
'test_loss_top_decile': 39.727236, 'test_loss_min': 7.337724, \
'test_loss_max': 100.899873, 'test_loss_bottom10%': 9.618685, \
'test_loss_top10%': 54.96769, 'test_loss_cos1': 0.880356, \
'test_loss_entropy': 5.175803, 'test_acc_std': 0.123823, \
'test_acc_bottom_decile': 0.676471, 'test_acc_top_decile': \
0.916667, \
'test_acc_min': 0.071429, 'test_acc_max': 0.972973, \
'test_acc_bottom10%': 0.527482, 'test_acc_top10%': 0.94486, \
'test_acc_cos1': 0.988134, 'test_acc_entropy': 5.283755 \
}, \
}
```
"""
if forms is None:
forms = ['weighted_avg', 'avg', 'fairness', 'raw']
round_formatted_results = {'Role': role, 'Round': rnd}
round_formatted_results_raw = {'Role': role, 'Round': rnd}
if 'group_avg' in forms: # have different format
# ({client_id: metrics})
new_results = {}
num_of_client_for_data = self.cfg.data.num_of_client_for_data
client_start_id = 1
for group_id, num_clients in enumerate(num_of_client_for_data):
if client_start_id > len(results):
break
group_res = copy.deepcopy(results[client_start_id])
num_div = num_clients - max(
0, client_start_id + num_clients - len(results) - 1)
for client_id in range(client_start_id,
client_start_id + num_clients):
if client_id > len(results):
break
for k, v in group_res.items():
if isinstance(v, dict):
for kk in v:
if client_id == client_start_id:
group_res[k][kk] /= num_div
else:
group_res[k][kk] += results[client_id][k][
kk] / num_div
else:
if client_id == client_start_id:
group_res[k] /= num_div
else:
group_res[k] += results[client_id][k] / num_div
new_results[group_id + 1] = group_res
client_start_id += num_clients
round_formatted_results['Results_group_avg'] = new_results
else:
for form in forms:
new_results = copy.deepcopy(results)
if not role.lower().startswith('server') or form == 'raw':
round_formatted_results_raw['Results_raw'] = new_results
elif form not in Monitor.SUPPORTED_FORMS:
continue
else:
for key in results.keys():
dataset_name = key.split("_")[0]
if f'{dataset_name}_total' not in results:
raise ValueError(
"Results to be formatted should be include "
"the dataset_num in the dict,"
f"with key = {dataset_name}_total")
else:
dataset_num = np.array(
results[f'{dataset_name}_total'])
if key in [
f'{dataset_name}_total',
f'{dataset_name}_correct'
]:
new_results[key] = np.mean(new_results[key])
if key in [
f'{dataset_name}_total',
f'{dataset_name}_correct'
]:
new_results[key] = np.mean(new_results[key])
else:
all_res = np.array(copy.copy(results[key]))
if form == 'weighted_avg':
new_results[key] = np.sum(
np.array(new_results[key]) *
dataset_num) / np.sum(dataset_num)
if form == "avg":
new_results[key] = np.mean(new_results[key])
if form == "fairness" and all_res.size > 1:
# by default, log the std and decile
new_results.pop(
key,
None) # delete the redundant original one
all_res.sort()
new_results[f"{key}_std"] = np.std(
np.array(all_res))
new_results[f"{key}_bottom_decile"] = all_res[
all_res.size // 10]
new_results[f"{key}_top_decile"] = all_res[
all_res.size * 9 // 10]
# log more fairness metrics
# min and max
new_results[f"{key}_min"] = all_res[0]
new_results[f"{key}_max"] = all_res[-1]
# bottom and top 10%
new_results[f"{key}_bottom10%"] = np.mean(
all_res[:all_res.size // 10])
new_results[f"{key}_top10%"] = np.mean(
all_res[all_res.size * 9 // 10:])
# cosine similarity between the performance
# distribution and 1
new_results[f"{key}_cos1"] = np.mean(
all_res) / (np.sqrt(np.mean(all_res**2)))
# entropy of performance distribution
all_res_preprocessed = all_res + 1e-9
new_results[f"{key}_entropy"] = np.sum(
-all_res_preprocessed /
np.sum(all_res_preprocessed) * (np.log(
(all_res_preprocessed) /
np.sum(all_res_preprocessed))))
round_formatted_results[f'Results_{form}'] = new_results
with open(os.path.join(self.outdir, "eval_results.raw"),
"a") as outfile:
outfile.write(str(round_formatted_results_raw) + "\n")
return round_formatted_results_raw if return_raw else \
round_formatted_results
[docs] def calc_model_metric(self, last_model, local_updated_models, rnd):
"""
Arguments:
last_model (dict): the state of last round.
local_updated_models (list): each element is (data_size, model).
Returns:
dict: model_metric_dict
"""
model_metric_dict = {}
for metric in self.cfg.eval.monitoring:
func_name = f'calc_{metric}'
calc_metric = getattr(
import_module(
'federatedscope.core.monitors.metric_calculator'),
func_name)
metric_value = calc_metric(last_model, local_updated_models)
model_metric_dict[f'train_{metric}'] = metric_value
formatted_log = {
'Role': 'Server #',
'Round': rnd,
'Results_model_metric': model_metric_dict
}
logger.info(formatted_log)
return model_metric_dict
[docs] def convert_size(self, size_bytes):
"""
Convert bytes to human-readable size.
"""
import math
if size_bytes <= 0:
return str(size_bytes)
size_name = ("", "K", "M", "G", "T", "P", "E", "Z", "Y")
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
return f"{s}{size_name[i]}"
[docs] def track_model_size(self, models):
"""
calculate the total model size given the models hold by the \
worker/trainer
Args
models: torch.nn.Module or list of torch.nn.Module
"""
if self.total_model_size != 0:
logger.warning(
"the total_model_size is not zero. You may have been "
"calculated the total_model_size before")
if not hasattr(models, '__iter__'):
models = [models]
for model in models:
assert isinstance(model, torch.nn.Module), \
f"the `model` should be type torch.nn.Module when " \
f"calculating its size, but got {type(model)}"
for name, para in model.named_parameters():
self.total_model_size += para.numel()
[docs] def track_avg_flops(self, flops, sample_num=1):
"""
update the average flops for forwarding each data sample, \
for most models and tasks, \
the averaging is not needed as the input shape is fixed
"""
self.flops_per_sample = (self.flops_per_sample * self.flop_count +
flops) / (self.flop_count + sample_num)
self.flop_count += 1
[docs] def track_upload_bytes(self, bytes):
"""
Track the number of bytes uploaded.
"""
self.total_upload_bytes += bytes
[docs] def track_download_bytes(self, bytes):
"""
Track the number of bytes downloaded.
"""
self.total_download_bytes += bytes
[docs] def update_best_result(self, best_results, new_results, results_type):
"""
Update best evaluation results. \
by default, the update is based on validation loss with \
``round_wise_update_key="val_loss" ``
"""
update_best_this_round = False
if not isinstance(new_results, dict):
raise ValueError(
f"update best results require `results` a dict, but got"
f" {type(new_results)}")
else:
if results_type not in best_results:
best_results[results_type] = dict()
best_result = best_results[results_type]
# update different keys separately: the best values can be in
# different rounds
if self.round_wise_update_key is None:
for key in new_results:
cur_result = new_results[key]
if 'loss' in key or 'std' in key: # the smaller,
# the better
if results_type in [
"client_best_individual",
"unseen_client_best_individual"
]:
cur_result = min(cur_result)
if key not in best_result or cur_result < best_result[
key]:
best_result[key] = cur_result
update_best_this_round = True
elif 'acc' in key: # the larger, the better
if results_type in [
"client_best_individual",
"unseen_client_best_individual"
]:
cur_result = max(cur_result)
if key not in best_result or cur_result > best_result[
key]:
best_result[key] = cur_result
update_best_this_round = True
else:
# unconcerned metric
pass
# update different keys round-wise: if find better
# round_wise_update_key, update others at the same time
else:
found_round_wise_update_key = False
sorted_keys = []
for key in new_results:
# TODO: fix `in` condition
if self.round_wise_update_key in key:
sorted_keys.insert(0, key)
found_round_wise_update_key = key
else:
sorted_keys.append(key)
if not found_round_wise_update_key:
raise ValueError(
"Your specified eval.best_res_update_round_wise_key "
"is not in target results, "
"use another key or check the name. \n"
f"Got eval.best_res_update_round_wise_key"
f"={self.round_wise_update_key}, "
f"the keys of results are {list(new_results.keys())}")
# the first key must be the `round_wise_update_key`,
# `update_best_this_round` should be set while evaluating the
# first key, so we can check whether `update_best_this_round`
# firstly
cur_result = new_results[found_round_wise_update_key]
if self.the_larger_the_better:
# The larger, the better
if results_type in [
"client_best_individual",
"unseen_client_best_individual"
]:
cur_result = max(cur_result)
if found_round_wise_update_key not in best_result or\
cur_result > best_result[
found_round_wise_update_key]:
best_result[found_round_wise_update_key] = cur_result
update_best_this_round = True
else:
# The smaller, the better
if results_type in [
"client_best_individual",
"unseen_client_best_individual"
]:
cur_result = min(cur_result)
if found_round_wise_update_key not in best_result or \
cur_result < best_result[
found_round_wise_update_key]:
best_result[found_round_wise_update_key] = cur_result
update_best_this_round = True
# update other metrics only if update_best_this_round is True
if update_best_this_round:
for key in sorted_keys[1:]:
cur_result = new_results[key]
if results_type in [
"client_best_individual",
"unseen_client_best_individual"
]:
# Obtain the whether the larger the better
for mode in ['train', 'val', 'test']:
if mode in key:
_key = key.split(f'{mode}_')[1]
if self.metric_calculator.eval_metric[
_key][1]:
cur_result = max(cur_result)
else:
cur_result = min(cur_result)
best_result[key] = cur_result
if update_best_this_round:
line = f"Find new best result: {best_results}"
logging.info(line)
if self.use_wandb and self.wandb_online_track:
try:
import wandb
exp_stop_normal = False
exp_stop_normal, log_res = logline_2_wandb_dict(
exp_stop_normal,
line,
self.log_res_best,
raw_out=False)
# wandb.log(self.log_res_best)
for k, v in self.log_res_best.items():
wandb.summary[k] = v
except ImportError:
logger.error(
"cfg.wandb.use=True but not install the wandb package")
exit()
[docs] def add_items_to_best_result(self, best_results, new_results,
results_type):
"""
Add a new key: value item (results-type: new_results) to best_result
"""
best_results[results_type] = new_results