Source code for federatedscope.core.aggregators.bulyan_aggregator

import copy
import torch
from federatedscope.core.aggregators import ClientsAvgAggregator


[docs]class BulyanAggregator(ClientsAvgAggregator): """ Implementation of Bulyan refers to `The Hidden Vulnerability of Distributed Learning in Byzantium` [Mhamdi et al., 2018] (http://proceedings.mlr.press/v80/mhamdi18a/mhamdi18a.pdf) It combines the MultiKrum aggregator and the treamedmean aggregator """ def __init__(self, model=None, device='cpu', config=None): super(BulyanAggregator, self).__init__(model, device, config) self.byzantine_node_num = config.aggregator.byzantine_node_num self.sample_client_rate = config.federate.sample_client_rate assert 4 * self.byzantine_node_num + 3 <= config.federate.client_num
[docs] def aggregate(self, agg_info): """ To preform aggregation with Median aggregation rule Arguments: agg_info (dict): the feedbacks from clients :returns: the aggregated results :rtype: dict """ models = agg_info["client_feedback"] avg_model = self._aggre_with_bulyan(models) updated_model = copy.deepcopy(avg_model) init_model = self.model.state_dict() for key in avg_model: updated_model[key] = init_model[key] + avg_model[key] return updated_model
def _calculate_distance(self, model_a, model_b): """ Calculate the Euclidean distance between two given model para delta """ distance = 0.0 for key in model_a: if isinstance(model_a[key], torch.Tensor): model_a[key] = model_a[key].float() model_b[key] = model_b[key].float() else: model_a[key] = torch.FloatTensor(model_a[key]) model_b[key] = torch.FloatTensor(model_b[key]) distance += torch.dist(model_a[key], model_b[key], p=2) return distance def _calculate_score(self, models): """ Calculate Krum scores """ model_num = len(models) closest_num = model_num - self.byzantine_node_num - 2 distance_matrix = torch.zeros(model_num, model_num) for index_a in range(model_num): for index_b in range(index_a, model_num): if index_a == index_b: distance_matrix[index_a, index_b] = float('inf') else: distance_matrix[index_a, index_b] = distance_matrix[ index_b, index_a] = self._calculate_distance( models[index_a], models[index_b]) sorted_distance = torch.sort(distance_matrix)[0] krum_scores = torch.sum(sorted_distance[:, :closest_num], axis=-1) return krum_scores def _aggre_with_bulyan(self, models): ''' Apply MultiKrum to select \theta (\theta <= client_num- 2*self.byzantine_node_num) local models ''' init_model = self.model.state_dict() global_update = copy.deepcopy(init_model) models_para = [each_model[1] for each_model in models] krum_scores = self._calculate_score(models_para) index_order = torch.sort(krum_scores)[1].numpy() reliable_models = list() for number, index in enumerate(index_order): if number < len(models) - int( 2 * self.sample_client_rate * self.byzantine_node_num): reliable_models.append(models[index]) ''' Sort parameter for each coordinate of the rest \theta reliable local models, and find \gamma (gamma<\theta-2*self.byzantine_num) parameters closest to the median to perform averaging ''' exluded_num = int(self.sample_client_rate * self.byzantine_node_num) gamma = len(reliable_models) - 2 * exluded_num for key in init_model: temp = torch.stack( [each_model[1][key] for each_model in reliable_models], 0) pos_largest, _ = torch.topk(temp, exluded_num, 0) neg_smallest, _ = torch.topk(-temp, exluded_num, 0) new_stacked = torch.cat([temp, -pos_largest, neg_smallest]).sum(0).float() new_stacked /= gamma global_update[key] = new_stacked return global_update