Source code for federatedscope.mf.dataset.movielens

import os
import pickle
import logging

from torchvision.datasets.utils import check_integrity, \
    download_and_extract_archive, calculate_md5
import pandas as pd
from tqdm import tqdm
import scipy.sparse as sp
from numpy.random import shuffle
from scipy.sparse import coo_matrix
from scipy.sparse import csc_matrix
import numpy as np

logger = logging.getLogger(__name__)


[docs]class VMFDataset: """Dataset of matrix factorization task in vertical federated learning. """ def _split_n_clients_rating(self, ratings: csc_matrix, num_client: int, test_portion: float): id_item = np.arange(self.n_item) shuffle(id_item) items_per_client = np.array_split(id_item, num_client) data = dict() train_ratings_all, test_ratings_all = [], [] for clientId, items in tqdm(enumerate(items_per_client)): client_ratings = ratings[:, items] train_ratings, test_ratings = self._split_train_test_ratings( client_ratings, test_portion) data[clientId + 1] = {"train": train_ratings, "test": test_ratings} train_ratings_all.append(train_ratings) test_ratings_all.append(test_ratings) # Server holds all data[0] = { "train": sp.hstack(train_ratings_all).tocsc(), "test": sp.hstack(test_ratings_all).tocsc() } with open(self.processed_data, 'wb') as f: pickle.dump(data, f) return data
[docs]class HMFDataset: """Dataset of matrix factorization task in horizontal federated learning. """ def _split_n_clients_rating(self, ratings: csc_matrix, num_client: int, test_portion: float): id_user = np.arange(self.n_user) shuffle(id_user) users_per_client = np.array_split(id_user, num_client) data = dict() train_ratings_all, test_ratings_all = [], [] for cliendId, users in tqdm(enumerate(users_per_client)): client_ratings = ratings[users, :] train_ratings, test_ratings = self._split_train_test_ratings( client_ratings, test_portion) data[cliendId + 1] = {"train": train_ratings, "test": test_ratings} train_ratings_all.append(train_ratings) test_ratings_all.append(test_ratings) # Server holds all data[0] = { "train": sp.vstack(train_ratings_all).tocsc(), "test": sp.vstack(test_ratings_all).tocsc() } with open(self.processed_data, 'wb') as f: pickle.dump(data, f) return data
[docs]class MovieLensData(object): """Download and split MF datasets Arguments: root (string): the path of data num_client (int): the number of clients train_portion (float): the portion of training data download (bool): indicator to download dataset """ def __init__(self, root, num_client, train_portion=0.9, download=True): super(MovieLensData, self).__init__() self.root = root self.data = None self.n_user = None self.n_item = None if download: self.download() if not self._check_integrity(): raise RuntimeError("Dataset not found or corrupted." + "You can use download=True to download it") ratings = self._load_meta() self.processed_data = os.path.join(self.root, self.base_folder, 'processed_data.pkl') if os.path.exists(self.processed_data): with open(self.processed_data, 'rb') as f: self.data = pickle.load(f) else: logger.info(f"Processing data into {num_client} parties.") self.data = self._split_n_clients_rating(ratings, num_client, 1 - train_portion) def _split_train_test_ratings(self, ratings: csc_matrix, test_portion: float): n_ratings = ratings.count_nonzero() id_test = np.random.choice(n_ratings, int(n_ratings * test_portion), replace=False) id_train = list(set(np.arange(n_ratings)) - set(id_test)) ratings = ratings.tocoo() test = coo_matrix((ratings.data[id_test], (ratings.row[id_test], ratings.col[id_test])), shape=ratings.shape) train = coo_matrix((ratings.data[id_train], (ratings.row[id_train], ratings.col[id_train])), shape=ratings.shape) train_ratings, test_ratings = train.tocsc(), test.tocsc() return train_ratings, test_ratings def _read_raw(self): fpath = os.path.join(self.root, self.base_folder, self.filename, self.raw_file) data = pd.read_csv(fpath, sep="::", engine="python", usecols=[0, 1, 2], names=["userId", "movieId", "rating"], dtype={ "userId": np.int32, "movieId": np.int32, "rating": np.float32 }) return data def _load_meta(self): meta_path = os.path.join(self.root, self.base_folder, "ratings.pkl") if not os.path.exists(meta_path): logger.info("Processing ratings.") data = self._read_raw() # Map idx unique_id_item, unique_id_user = np.sort( data["movieId"].unique()), np.sort(data["userId"].unique()) n_item, n_user = len(unique_id_item), len(unique_id_user) mapping_item, mapping_user = { mid: idx for idx, mid in enumerate(unique_id_item) }, {mid: idx for idx, mid in enumerate(unique_id_user)} row = [mapping_user[mid] for _, mid in data["userId"].iteritems()] col = [mapping_item[mid] for _, mid in data["movieId"].iteritems()] ratings = coo_matrix((data["rating"], (row, col)), shape=(n_user, n_item)) ratings = ratings.tocsc() with open(meta_path, 'wb') as f: pickle.dump(ratings, f) logger.info("Done.") else: with open(meta_path, 'rb') as f: ratings = pickle.load(f) self.n_user, self.n_item = ratings.shape return ratings def _check_integrity(self): fpath = os.path.join(self.root, self.base_folder, self.filename, self.raw_file) return check_integrity(fpath, self.raw_file_md5) def download(self): if self._check_integrity(): logger.info("Files already downloaded and verified") return download_and_extract_archive(self.url, os.path.join(self.root, self.base_folder), filename=self.url.split('/')[-1], md5=self.zip_md5)
[docs]class MovieLens1M(MovieLensData): """MoviesLens 1M Dataset (https://grouplens.org/datasets/movielens) Format: UserID::MovieID::Rating::Timestamp Arguments: root (str): Root directory of dataset where directory ``MoviesLen1M`` exists or will be saved to if download is set to True. config (callable): Parameters related to matrix factorization. train_size (float, optional): The proportion of training data. test_size (float, optional): The proportion of test data. download (bool, optional): If true, downloads the dataset from the internet and puts it in root directory. If dataset is already downloaded, it is not downloaded again. """ base_folder = 'MovieLens1M' url = "https://files.grouplens.org/datasets/movielens/ml-1m.zip" filename = "ml-1m" zip_md5 = "c4d9eecfca2ab87c1945afe126590906" raw_file = "ratings.dat" raw_file_md5 = "a89aa3591bc97d6d4e0c89459ff39362"
[docs]class MovieLens10M(MovieLensData): """MoviesLens 10M Dataset (https://grouplens.org/datasets/movielens) Format: UserID::MovieID::Rating::Timestamp Arguments: root (str): Root directory of dataset where directory ``MoviesLen1M`` exists or will be saved to if download is set to True. config (callable): Parameters related to matrix factorization. train_size (float, optional): The proportion of training data. test_size (float, optional): The proportion of test data. download (bool, optional): If true, downloads the dataset from the internet and puts it in root directory. If dataset is already downloaded, it is not downloaded again. """ base_folder = 'MovieLens10M' url = "https://files.grouplens.org/datasets/movielens/ml-10m.zip" filename = "ml-10M100K" zip_md5 = "ce571fd55effeba0271552578f2648bd" raw_file = "ratings.dat" raw_file_md5 = "3f317698625386f66177629fa5c6b2dc"
[docs]class VFLMovieLens1M(MovieLens1M, VMFDataset): """MovieLens1M dataset in VFL setting """ pass
[docs]class HFLMovieLens1M(MovieLens1M, HMFDataset): """MovieLens1M dataset in HFL setting """ pass
[docs]class VFLMovieLens10M(MovieLens10M, VMFDataset): """MovieLens10M dataset in VFL setting """ pass
[docs]class HFLMovieLens10M(MovieLens10M, HMFDataset): """MovieLens10M dataset in HFL setting """ pass