forked from mindspore-Ecosystem/mindspore
add enable_parameter_server flag
This commit is contained in:
parent
28c8a5cc26
commit
1641df4c63
|
@ -45,8 +45,35 @@ def _init_allreduce_operators(length):
|
|||
return op_list
|
||||
|
||||
|
||||
@reduce_opt.register("Number", "Bool", "Function", "Function", "Bool", "Tensor")
|
||||
def _tensors_allreduce(degree, mean, allgather, allreduce, allreduce_filter, grad):
|
||||
"""
|
||||
Apply allreduce on gradient.
|
||||
|
||||
Args:
|
||||
degree (int): The mean coefficient.
|
||||
mean (bool): When mean is true, the mean coefficient (degree) would apply on gradients.
|
||||
allgather (Primitive): The communication operator for sparse gradients.
|
||||
allreduce (Primitive): The communication operator for gradients.
|
||||
allreduce_filter (bool): When it is true, allreduce would apply.
|
||||
grad (Tensor): The gradient tensor before operation.
|
||||
|
||||
Returns:
|
||||
Tensor, the gradient tensor after operation.
|
||||
"""
|
||||
if allreduce_filter:
|
||||
grad = allreduce(grad)
|
||||
if mean:
|
||||
degree = F.scalar_cast(degree, F.dtype(grad))
|
||||
cast_op = P.Cast()
|
||||
mul_op = P.Mul()
|
||||
grad = mul_op(grad, cast_op(F.scalar_to_array(1.0 / degree), F.dtype(grad)))
|
||||
return grad
|
||||
return grad
|
||||
|
||||
|
||||
@reduce_opt.register("Number", "Bool", "Function", "Function", "Bool", "Tensor", "Bool")
|
||||
def _tensors_allreduce(degree, mean, allgather, allreduce, allreduce_filter, grad, ps_parameter):
|
||||
def _tensors_allreduce_ps(degree, mean, allgather, allreduce, allreduce_filter, grad, ps_parameter):
|
||||
"""
|
||||
Apply allreduce on gradient.
|
||||
|
||||
|
@ -76,8 +103,37 @@ def _tensors_allreduce(degree, mean, allgather, allreduce, allreduce_filter, gra
|
|||
return grad
|
||||
|
||||
|
||||
@reduce_opt.register("Number", "Bool", "Function", "Function", "Bool", "IndexedSlices")
|
||||
def _tensors_allreduce_with_sparse(degree, mean, allgather, allreduce, allreduce_filter, grad):
|
||||
"""
|
||||
Apply allgather on gradient instead of allreduce for sparse feature.
|
||||
Allgather is a communication operation used for distributed deep learning.
|
||||
|
||||
Args:
|
||||
degree (int): The mean coefficient.
|
||||
mean (bool): When mean is true, the mean coefficient (degree) would apply on gradients.
|
||||
allgather (Primitive): The communication operator for sparse gradients.
|
||||
allreduce (Primitive): The communication operator for gradients.
|
||||
allreduce_filter (bool): When it is true, allgather would apply.
|
||||
grad (tuple): The indices, gradient tensor and tensor_shape before operation.
|
||||
|
||||
Returns:
|
||||
IndexedSlices, the gradient after operation.
|
||||
"""
|
||||
if allreduce_filter:
|
||||
indices = allgather(grad.indices())
|
||||
dout = allgather(grad.values())
|
||||
if mean:
|
||||
degree = F.scalar_cast(degree, F.dtype(grad.values()))
|
||||
cast_op = P.Cast()
|
||||
mul_op = P.Mul()
|
||||
dout = mul_op(dout, cast_op(F.scalar_to_array(1.0 / degree), F.dtype(dout)))
|
||||
grad = IndexedSlices(indices, dout, grad.dense_shape())
|
||||
return grad
|
||||
|
||||
|
||||
@reduce_opt.register("Number", "Bool", "Function", "Function", "Bool", "IndexedSlices", "Bool")
|
||||
def _tensors_allreduce_with_sparse(degree, mean, allgather, allreduce, allreduce_filter, grad, ps_parameter):
|
||||
def _tensors_allreduce_with_sparse_ps(degree, mean, allgather, allreduce, allreduce_filter, grad, ps_parameter):
|
||||
"""
|
||||
Apply allgather on gradient instead of allreduce for sparse feature.
|
||||
Allgather is a communication operation used for distributed deep learning.
|
||||
|
@ -269,6 +325,7 @@ class DistributedGradReducer(Cell):
|
|||
self.allgather = AllGather(GlobalComm.WORLD_COMM_GROUP)
|
||||
ps_filter = lambda x: x.is_param_ps
|
||||
self.ps_parameters = tuple(ps_filter(x) for x in parameters)
|
||||
self.enable_parameter_server = any(self.ps_parameters)
|
||||
|
||||
def construct(self, grads):
|
||||
"""
|
||||
|
@ -285,10 +342,18 @@ class DistributedGradReducer(Cell):
|
|||
datatypes = self.map_(F.partial(_get_datatype), grads)
|
||||
grads = self.map_(F.partial(_cast_datatype, mstype.float32), grads)
|
||||
if self.split_fusion:
|
||||
new_grad = self.map_(F.partial(reduce_opt, self.degree, self.mean, self.allgather),
|
||||
self.opt_list, self.allreduce_filter, grads, self.ps_parameters)
|
||||
if self.enable_parameter_server:
|
||||
new_grad = self.map_(F.partial(reduce_opt, self.degree, self.mean, self.allgather),
|
||||
self.opt_list, self.allreduce_filter, grads, self.ps_parameters)
|
||||
else:
|
||||
new_grad = self.map_(F.partial(reduce_opt, self.degree, self.mean, self.allgather),
|
||||
self.opt_list, self.allreduce_filter, grads)
|
||||
else:
|
||||
new_grad = self.map_(F.partial(reduce_opt, self.degree, self.mean, self.allgather,
|
||||
self.allreduce), self.allreduce_filter, grads, self.ps_parameters)
|
||||
if self.enable_parameter_server:
|
||||
new_grad = self.map_(F.partial(reduce_opt, self.degree, self.mean, self.allgather,
|
||||
self.allreduce), self.allreduce_filter, grads, self.ps_parameters)
|
||||
else:
|
||||
new_grad = self.map_(F.partial(reduce_opt, self.degree, self.mean, self.allgather,
|
||||
self.allreduce), self.allreduce_filter, grads)
|
||||
new_grad = self.map_(F.partial(_cast_datatype), datatypes, new_grad)
|
||||
return new_grad
|
||||
|
|
Loading…
Reference in New Issue