import scipy.sparse as sp
from scipy.sparse import linalg
import numpy as np
import torch
# def build_sparse_matrix(device, lap):
# lap = lap.tocoo()
# indices = np.column_stack((lap.row, lap.col))
# # this is to ensure row-major ordering to equal torch.sparse.sparse_reorder(L)
# indices = indices[np.lexsort((indices[:, 0], indices[:, 1]))]
# lap = torch.sparse_coo_tensor(indices.T, lap.data, lap.shape, device=device)
# return lap.to(torch.float32)
[docs]def build_sparse_matrix(device, lap):
"""
构建稀疏矩阵(tensor)
Args:
device:
lap: 拉普拉斯
Returns:
"""
shape = lap.shape
i = torch.LongTensor(np.vstack((lap.row, lap.col)).astype(int))
v = torch.FloatTensor(lap.data)
return torch.sparse.FloatTensor(i, v, torch.Size(shape)).to(device)
[docs]def get_cheb_polynomial(l_tilde, k):
"""
compute a list of chebyshev polynomials from T_0 to T_{K-1}
Args:
l_tilde(scipy.sparse.coo.coo_matrix): scaled Laplacian, shape (N, N)
k(int): the maximum order of chebyshev polynomials
Returns:
list(np.ndarray): cheb_polynomials, length: K, from T_0 to T_{K-1}
"""
l_tilde = sp.coo_matrix(l_tilde)
num = l_tilde.shape[0]
cheb_polynomials = [sp.eye(num).tocoo(), l_tilde.copy()]
for i in range(2, k + 1):
cheb_i = (2 * l_tilde).dot(cheb_polynomials[i - 1]) - cheb_polynomials[i - 2]
cheb_polynomials.append(cheb_i.tocoo())
return cheb_polynomials
[docs]def get_supports_matrix(adj_mx, filter_type='laplacian', undirected=True):
"""
选择不同类别的拉普拉斯
Args:
undirected:
adj_mx:
filter_type:
Returns:
"""
supports = []
if filter_type == "laplacian":
supports.append(calculate_scaled_laplacian(adj_mx, lambda_max=None, undirected=undirected))
elif filter_type == "random_walk":
supports.append(calculate_random_walk_matrix(adj_mx).T)
elif filter_type == "dual_random_walk":
supports.append(calculate_random_walk_matrix(adj_mx).T)
supports.append(calculate_random_walk_matrix(adj_mx.T).T)
else:
supports.append(calculate_scaled_laplacian(adj_mx))
return supports
[docs]def calculate_normalized_laplacian(adj):
"""
L = D^-1/2 (D-A) D^-1/2 = I - D^-1/2 A D^-1/2
对称归一化的拉普拉斯
Args:
adj: adj matrix
Returns:
np.ndarray: L
"""
adj = sp.coo_matrix(adj)
d = np.array(adj.sum(1))
d_inv_sqrt = np.power(d, -0.5).flatten()
d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0.
d_mat_inv_sqrt = sp.diags(d_inv_sqrt)
normalized_laplacian = sp.eye(adj.shape[0]) - adj.dot(d_mat_inv_sqrt).transpose().dot(d_mat_inv_sqrt).tocoo()
return normalized_laplacian
[docs]def calculate_random_walk_matrix(adj_mx):
"""
L = D^-1 * A
随机游走拉普拉斯
Args:
adj_mx: adj matrix
Returns:
np.ndarray: L
"""
adj_mx = sp.coo_matrix(adj_mx)
d = np.array(adj_mx.sum(1))
d_inv = np.power(d, -1).flatten()
d_inv[np.isinf(d_inv)] = 0.
d_mat_inv = sp.diags(d_inv)
random_walk_mx = d_mat_inv.dot(adj_mx).tocoo()
return random_walk_mx
[docs]def calculate_scaled_laplacian(adj_mx, lambda_max=2, undirected=True):
"""
计算近似后的拉普莱斯矩阵~L
Args:
adj_mx:
lambda_max:
undirected:
Returns:
~L = 2 * L / lambda_max - I
"""
adj_mx = sp.coo_matrix(adj_mx)
if undirected:
bigger = adj_mx > adj_mx.T
smaller = adj_mx < adj_mx.T
notequall = adj_mx != adj_mx.T
adj_mx = adj_mx - adj_mx.multiply(notequall) + adj_mx.multiply(bigger) + adj_mx.T.multiply(smaller)
lap = calculate_normalized_laplacian(adj_mx)
if lambda_max is None:
lambda_max, _ = linalg.eigsh(lap, 1, which='LM')
lambda_max = lambda_max[0]
lap = sp.csr_matrix(lap)
m, _ = lap.shape
identity = sp.identity(m, format='csr', dtype=lap.dtype)
lap = (2 / lambda_max * lap) - identity
return lap.astype(np.float32).tocoo()