import torch
from typing import List
from torch_geometric.data import Data
from torch_geometric.utils import to_networkx, to_dense_adj, dense_to_sparse
[docs]class Analyzer(object):
r"""Analyzer for raw graph and split subgraphs.
Arguments:
raw_data (PyG.data): raw graph.
split_data (list): the list for subgraphs split by splitter.
"""
def __init__(self, raw_data: Data, split_data: List[Data]):
self.raw_data = raw_data
self.split_data = split_data
self.raw_graph = to_networkx(raw_data, to_undirected=True)
self.sub_graphs = [
to_networkx(g, to_undirected=True) for g in split_data
]
[docs] def num_missing_edge(self):
r"""
Returns:
the number of missing edge and the rate of missing edge.
"""
missing_edge = len(self.raw_graph.edges) - self.fl_adj().shape[1] // 2
rate_missing_edge = missing_edge / len(self.raw_graph.edges)
return missing_edge, rate_missing_edge
[docs] def fl_adj(self):
r"""
Returns:
the adj for missing edge ADJ.
"""
raw_adj = to_dense_adj(self.raw_data.edge_index)[0]
adj = torch.zeros_like(raw_adj)
if 'index_orig' in self.split_data[0]:
for sub_g in self.split_data:
for row, col in sub_g.edge_index.T:
adj[sub_g.index_orig[row.item()]][sub_g.index_orig[
col.item()]] = 1
else:
raise KeyError('index_orig not in Split Data.')
return dense_to_sparse(adj)[0]
[docs] def fl_data(self):
r"""
Returns:
the split edge index.
"""
fl_data = Data()
for key, item in self.raw_data:
if key == 'edge_index':
fl_data[key] = self.fl_adj()
else:
fl_data[key] = item
return fl_data
[docs] def missing_data(self):
r"""
Returns:
the graph data built by missing edge index.
"""
ms_data = Data()
raw_edge_set = {tuple(x) for x in self.raw_data.edge_index.T.numpy()}
split_edge_set = {
tuple(x)
for x in self.fl_data().edge_index.T.numpy()
}
ms_set = raw_edge_set - split_edge_set
for key, item in self.raw_data:
if key == 'edge_index':
ms_data[key] = torch.tensor([list(x) for x in ms_set],
dtype=torch.int64).T
else:
ms_data[key] = item
return ms_data
[docs] def portion_ms_node(self):
r"""
Returns:
the proportion of nodes who miss egde.
"""
cnt_list = []
ms_set = {x.item() for x in set(self.missing_data().edge_index[0])}
for sub_data in self.split_data:
cnt = 0
for idx in sub_data.index_orig:
if idx.item() in ms_set:
cnt += 1
cnt_list.append(cnt / sub_data.num_nodes)
return cnt_list
[docs] def average_clustering(self):
r"""
Returns:
the average clustering coefficient for the raw G and split G
"""
import networkx.algorithms.cluster as cluster
return cluster.average_clustering(
self.raw_graph), cluster.average_clustering(
to_networkx(self.fl_data()))
[docs] def homophily_value(self, edge_index, y):
r"""
Returns:
calculate homophily_value
"""
from torch_sparse import SparseTensor
if isinstance(edge_index, SparseTensor):
row, col, _ = edge_index.coo()
else:
row, col = edge_index
return int((y[row] == y[col]).sum()) / row.size(0)
[docs] def homophily(self):
r"""
Returns:
the homophily for the raw G and split G
"""
return self.homophily_value(self.raw_data.edge_index,
self.raw_data.y), self.homophily_value(
self.fl_data().edge_index,
self.fl_data().y)
[docs] def hamming_distance_graph(self, data):
r"""
Returns:
calculate the hamming distance of graph data
"""
edge_index, x = data.edge_index, data.x
cnt = 0
for row, col in edge_index.T:
row, col = row.item(), col.item()
cnt += torch.sum(x[row] != x[col]).item()
return cnt / edge_index.shape[1]
[docs] def hamming(self):
r"""
Returns:
the average hamming distance of feature for the raw G, split G
and missing edge G
"""
return self.hamming_distance_graph(
self.raw_data), self.hamming_distance_graph(
self.fl_data()), self.hamming_distance_graph(
self.missing_data())