Source code for federatedscope.core.workers.base_server

import abc
from federatedscope.core.workers import Worker


[docs]class BaseServer(Worker): def __init__(self, ID, state, config, model, strategy): super(BaseServer, self).__init__(ID, state, config, model, strategy) self.msg_handlers = dict() self.msg_handlers_str = dict()
[docs] def register_handlers(self, msg_type, callback_func, send_msg=[None]): """ To bind a message type with a handling function. Arguments: msg_type (str): The defined message type callback_func: The handling functions to handle the received \ message """ self.msg_handlers[msg_type] = callback_func self.msg_handlers_str[msg_type] = (callback_func.__name__, send_msg)
[docs] def _register_default_handlers(self): """ Register default handler dic to handle message, which includes \ sender, receiver, state, and content. More detail can be found in \ ``federatedscope.core.message``. Note: the default handlers to handle messages and related callback \ function are shown below: ============================ ================================== Message type Callback function ============================ ================================== ``join_in`` ``callback_funcs_for_join_in()`` ``join_in_info`` ``callback_funcs_for_join_in()`` ``model_para`` ``callback_funcs_model_para()`` ``metrics`` ``callback_funcs_for_metrics`` ============================ ================================== """ self.register_handlers('join_in', self.callback_funcs_for_join_in, [ 'assign_client_id', 'ask_for_join_in_info', 'address', 'model_para' ]) self.register_handlers('join_in_info', self.callback_funcs_for_join_in, ['address', 'model_para']) self.register_handlers('model_para', self.callback_funcs_model_para, ['model_para', 'evaluate', 'finish']) self.register_handlers('metrics', self.callback_funcs_for_metrics, ['converged'])
[docs] @abc.abstractmethod def run(self): """ To start the FL course, listen and handle messages (for distributed \ mode). """ raise NotImplementedError
[docs] @abc.abstractmethod def callback_funcs_model_para(self, message): """ The handling function for receiving model parameters, which triggers \ ``check_and_move_on`` (perform aggregation when enough feedback has \ been received). This handling function is widely used in various FL \ courses. Arguments: message: The received message. """ raise NotImplementedError
[docs] @abc.abstractmethod def callback_funcs_for_join_in(self, message): """ The handling function for receiving the join in information. The \ server might request for some information (such as \ ``num_of_samples``) if necessary, assign IDs for the servers. \ If all the clients have joined in, the training process will be \ triggered. Arguments: message: The received message """ raise NotImplementedError
[docs] @abc.abstractmethod def callback_funcs_for_metrics(self, message): """ The handling function for receiving the evaluation results, \ which triggers ``check_and_move_on`` (perform aggregation when \ enough feedback has been received). Arguments: message: The received message """ raise NotImplementedError