mindspore/tests/st/gnn/aggregator.py

391 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Aggregator."""
import mindspore.nn as nn
from mindspore import Tensor, Parameter
from mindspore._checkparam import check_int_positive, check_bool
from mindspore._extends import cell_attr_register
from mindspore.common.initializer import initializer
from mindspore.nn.layer.activation import get_activation
from mindspore.ops import functional as F
from mindspore.ops import operations as P
class GNNFeatureTransform(nn.Cell):
r"""
The GNN featuren transform layer for input.
Applies linear transformation for the input feature. This layer implements the operation as:
.. math::
\text{outputs} = \text{inputs} * \text{kernel} + \text{bias},
where :math:`\text{activation}` is the activation function passed as the activation
argument (if passed in),:math:`\text{activation}` is a weight matrix with the same
data type as the inputs created by the layer, and :math:`\text{bias}` is a bias vector
with the same data type as the inputs created by the layer (only if has_bias is True).
Args:
in_channels (int): The number of channels in the input space.
out_channels (int): The number of channels in the output space.
weight_init (Union[Tensor, str, Initializer, numbers.Number]): The trainable weight_init parameter. The dtype
is same as input x. The values of str refer to the function `initializer`. Default: 'normal'.
bias_init (Union[Tensor, str, Initializer, numbers.Number]): The trainable bias_init parameter. The dtype is
same as input x. The values of str refer to the function `initializer`. Default: 'zeros'.
has_bias (bool): Specifies whether the layer uses a bias vector. Default: True.
Raises:
ValueError: If weight_init or bias_init shape is incorrect.
Inputs:
- **input_x** (Tensor) - The first tensor to be multiplied. The shape of the tensor is :math:`(*B, N, C)`,
where :math:`*B` represents the batch size which can be multidimensional, :math:`N` and :math:`C` are the
size of the last two dimensions. If `transpose_a` is True, its shape should be :math:`(*B, C, N)`.
Outputs:
Tensor, the shape of the output tensor is :math:`(*B, N, M)`.
Examples:
>>> net = nn.Dense(3, 4)
>>> input = Tensor(np.random.randint(0, 255, [2, 3]), mindspore.float32)
>>> net(input)
[[ 2.5246444 2.2738023 0.5711005 -3.9399147 ]
[ 1.0739875 4.0155234 0.94188046 -5.459526 ]]
"""
@cell_attr_register
def __init__(self,
in_channels,
out_channels,
weight_init='normal',
bias_init='zeros',
has_bias=True):
super(GNNFeatureTransform, self).__init__()
self.in_channels = check_int_positive(in_channels)
self.out_channels = check_int_positive(out_channels)
self.has_bias = check_bool(has_bias)
if isinstance(weight_init, Tensor):
if weight_init.dim() != 2 or weight_init.shape[0] != out_channels or \
weight_init.shape[1] != in_channels:
raise ValueError("weight_init shape error")
self.weight = Parameter(initializer(weight_init, [out_channels, in_channels]), name="weight")
if self.has_bias:
if isinstance(bias_init, Tensor):
if bias_init.dim() != 1 or bias_init.shape[0] != out_channels:
raise ValueError("bias_init shape error")
self.bias = Parameter(initializer(bias_init, [out_channels]), name="bias")
self.matmul = P.MatMul(transpose_b=True)
self.bias_add = P.BiasAdd()
def construct(self, x):
tensor_shape = F.shape(x)
input_feature = F.reshape(x, (tensor_shape[0] * tensor_shape[1], tensor_shape[2]))
output = self.matmul(input_feature, self.weight)
if self.has_bias:
output = self.bias_add(output, self.bias)
output = F.reshape(output, (tensor_shape[0], tensor_shape[1], self.out_channels))
return output
def extend_repr(self):
str_info = 'in_channels={}, out_channels={}, weight={}, has_bias={}' \
.format(self.in_channels, self.out_channels, self.weight, self.has_bias)
if self.has_bias:
str_info = str_info + ', bias={}'.format(self.bias)
return str_info
class _BaseAggregator(nn.Cell):
"""
Base Aggregator of GNN
Args:
feature_in_dim (int): Node or edge input feature dim.
feature_out_dim (int): Node or edge outpout feature dim.
use_fc (bool): Specifies whether a linear transformation before message is aggregated. Default: True
weight_init (Union[Tensor, str, Initializer, numbers.Number]): The trainable weight_init parameter. The dtype
is same as input x. The values of str refer to the function `initializer`. Default: 'normal'.
bias_init (Union[Tensor, str, Initializer, numbers.Number]): The trainable bias_init parameter. The dtype is
same as input x. The values of str refer to the function `initializer`. Default: 'zeros'.
has_bias (bool): Specifies whether the layer uses a bias vector. Default: True.
dropout_ratio (float): The keep rate of dropout layer, greater than 0 and less equal than 1. Default: None.
activation (str): Regularizer function applied to the output of the layer, eg. 'relu'. Default: None.
Examples:
>>> class MyAggregator(_BaseAggregator):
>>> def __init__(self):
>>> super(MyAggregator, self).__init__(self, feature_in_dim, feature_out_dim)
>>> self.reduce_mean = P.ReduceSum()
>>>
>>> def construct(self, x):
>>> return self.reduce_mean(x, 1)
"""
def __init__(self,
feature_in_dim,
feature_out_dim,
use_fc=True,
weight_init="normal",
bias_init="zeros",
has_bias=True,
dropout_ratio=None,
activation=None):
super(_BaseAggregator, self).__init__()
self.in_dim = feature_in_dim
self.out_dim = feature_out_dim
self.use_fc = use_fc
if self.use_fc:
self.weight_init = weight_init
self.bias_init = bias_init
self.has_bias = has_bias
self.fc = GNNFeatureTransform(self.in_dim,
self.out_dim,
weight_init=self.weight_init,
bias_init=self.bias_init,
has_bias=self.has_bias)
self.dropout_ratio = dropout_ratio
if self.dropout_ratio is not None:
self.dropout = nn.Dropout(keep_prob=self.dropout_ratio)
self.dropout_flag = self.dropout_ratio is not None
self.activation = get_activation(activation)
self.activation_flag = self.activation is not None
def construct(self, **kward):
"""Must be overridden by all subclasses."""
raise NotImplementedError
class MeanAggregator(_BaseAggregator):
"""
Mean Aggregator of GNN
Args:
feature_in_dim (int): Node or edge input feature dim.
feature_out_dim (int): Node or edge outpout feature dim.
use_fc (bool): Specifies whether a linear transformation before message is aggregated. Default: True
weight_init (Union[Tensor, str, Initializer, numbers.Number]): The trainable weight_init parameter. The dtype
is same as input x. The values of str refer to the function `initializer`. Default: 'normal'.
bias_init (Union[Tensor, str, Initializer, numbers.Number]): The trainable bias_init parameter. The dtype is
same as input x. The values of str refer to the function `initializer`. Default: 'zeros'.
has_bias (bool): Specifies whether the layer uses a bias vector. Default: True.
dropout_ratio (float): The keep rate of dropout layer, greater than 0 and less equal than 1. Default: None.
activation (str): Regularizer function applied to the output of the layer, eg. 'relu'. Default: None.
Examples:
>>> net = MeanAggregator(32, 64, activation="relu", dropout=0.5)
>>> input_data = Tensor(np.array(np.random.rand(32, 3, 32), dtypy=np.float32))
>>> output = net(input_data)
"""
def __init__(self,
feature_in_dim,
feature_out_dim,
use_fc=True,
weight_init="normal",
bias_init="zeros",
has_bias=True,
dropout_ratio=None,
activation=None):
super(MeanAggregator, self).__init__(
feature_in_dim,
feature_out_dim,
use_fc,
weight_init,
bias_init,
has_bias,
dropout_ratio,
activation)
self.reduce_mean = P.ReduceMean(keep_dims=False)
def construct(self, input_feature):
if self.use_fc:
input_feature = self.fc(input_feature)
if self.dropout_flag:
input_feature = self.dropout(input_feature)
if self.activation_flag:
input_feature = self.activation(input_feature)
output_feature = self.reduce_mean(input_feature, 1)
return output_feature
class AttentionHead(nn.Cell):
"""
Attention Head for Graph Attention Networks.
Args:
in_channel (int): The number of input channel, input feature dim.
out_channel (int): The number of output channel, output feature dim.
in_drop_ratio (float): Input feature dropout ratio, default 0.0.
coef_drop_ratio (float): Coefficient dropout ratio, default 0.0.
residual (bool): Whether to use residual connection, default False.
coef_activation (Cell): The attention coefficient activation function,
default nn.LeakyReLU().
activation (Cell): The output activation function, default nn.ELU().
Inputs:
- **input_feature** (Tensor) - Tensor of shape : (batch_size, num_nodes, feature_dim).
- **bias_mat** (Tensor) - Tensor of shape : (batch_size, num_nodes, num_nodes).
Examples:
>>> head = AttentionHead(1433,
8,
in_drop_ratio=0.6,
coef_drop_ratio=0.6,
residual=False)
>>> input_data = Tensor(np.array(np.random.rand(1, 2708, 1433), dtypy=np.float32))
>>> output = net(input_data)
"""
def __init__(self,
in_channel,
out_channel,
in_drop_ratio=0.0,
coef_drop_ratio=0.0,
residual=False,
coef_activation=nn.LeakyReLU(),
activation=nn.ELU()):
super(AttentionHead, self).__init__()
self.in_channel = check_int_positive(in_channel)
self.out_channel = check_int_positive(out_channel)
self.in_drop_ratio = in_drop_ratio
self.in_drop = nn.Dropout(keep_prob=1 - in_drop_ratio)
self.in_drop_2 = nn.Dropout(keep_prob=1 - in_drop_ratio)
self.feature_transform = GNNFeatureTransform(
in_channels=self.in_channel,
out_channels=self.out_channel,
has_bias=False)
self.f_1_transform = GNNFeatureTransform(
in_channels=self.out_channel,
out_channels=1)
self.f_2_transform = GNNFeatureTransform(
in_channels=self.out_channel,
out_channels=1)
self.softmax = nn.Softmax()
self.coef_drop = nn.Dropout(keep_prob=1 - coef_drop_ratio)
self.batch_matmul = P.BatchMatMul()
self.bias_add = P.BiasAdd()
self.bias = Parameter(initializer('zeros', self.out_channel), name='bias')
self.residual = check_bool(residual)
if self.residual:
if in_channel != out_channel:
self.residual_transform_flag = True
self.residual_transform = GNNFeatureTransform(
in_channels=self.in_channel,
out_channels=self.out_channel)
else:
self.residual_transform = None
self.coef_activation = coef_activation
self.activation = activation
def construct(self, input_feature, bias_mat):
input_feature = self.in_drop(input_feature)
feature = self.feature_transform(input_feature)
# self attention following the author
f_1 = self.f_1_transform(feature)
f_2 = self.f_2_transform(feature)
logits = f_1 + P.Transpose()(f_2, (0, 2, 1))
logits = self.coef_activation(logits) + bias_mat
coefs = self.softmax(logits)
coefs = self.coef_drop(coefs)
feature = self.in_drop_2(feature)
ret = self.batch_matmul(coefs, feature)
ret = P.Squeeze(0)(ret)
ret = self.bias_add(ret, self.bias)
ret = P.ExpandDims()(ret, 0)
# residual connection
if self.residual:
if self.residual_transform_flag:
res = self.residual_transform(input_feature)
ret = ret + res
else:
ret = ret + input_feature
# activation
if self.activation is not None:
ret = self.activation(ret)
return ret
class AttentionAggregator(nn.Cell):
"""
Attention Head for Graph Attention Networkscan be regarded as one
GAT layer.
Args:
in_channel (int): Input channel.
out_channel (int): Output channel.
num_heads (int): Number of attention heads for this layer, default 1.
in_drop_ratio (float): Input feature dropout ratio, default 0.0.
coef_drop_ratio (float): Coefficient dropout ratio, default 0.0.
activation (Cell): The output activation function, default nn.ELU().
residual (bool): Whether to use residual connection, default False.
output_transform (str['concat', 'sum']): output transform for a layer,
default 'concat'
Inputs:
- **input_feature** (Tensor) - Tensor of shape : (batch_size, num_nodes, feature_dim).
- **bias_mat** (Tensor) - Tensor of shape : (batch_size, num_nodes, num_nodes).
Examples:
>>> input_data = Tensor(np.array(np.random.rand(1, 2708, 1433), dtype=np.float32))
>>> biases = Tensor(np.array(np.random.rand(1, 2708, 2708), dtype=np.float32))
>>> net = AttentionAggregator(1433,
8,
8)
>>> net(input_data, biases)
"""
def __init__(self,
in_channels,
out_channels,
num_heads=1,
in_drop=0.0,
coef_drop=0.0,
activation=nn.ELU(),
residual=False,
output_transform='concat'):
super(AttentionAggregator, self).__init__()
self.num_heads = num_heads
self.attns = []
for _ in range(num_heads):
self.attns.append(AttentionHead(in_channels,
out_channels,
in_drop_ratio=in_drop,
coef_drop_ratio=coef_drop,
activation=activation,
residual=residual))
self.attns = nn.layer.CellList(self.attns)
if output_transform == 'concat':
self.out_trans = P.Concat(-1)
elif output_transform == 'sum':
self.out_trans = P.AddN()
else:
raise ValueError
def construct(self, input_data, bias_mat):
res = ()
for i in range(self.num_heads):
res += (self.attns[i](input_data, bias_mat),)
return self.out_trans(res)