!6391 Add PS pointer check.

Merge pull request !6391 from ZPaC/master-pointer-check
This commit is contained in:
mindspore-ci-bot 2020-09-17 19:31:38 +08:00 committed by Gitee
commit 65d4fdc5bb
6 changed files with 186 additions and 32 deletions

View File

@ -62,7 +62,13 @@ void OptimizerInfo::UpdateOptimInputValue(const std::string &optim_type, const s
size_t size = lens[ps_send_index] * sizeof(T);
size_t offset = std::accumulate(lens.begin(), lens.begin() + ps_send_index, 0, std::plus<int>());
AddressPtr optim_input = inputs_[origin_index];
int ret = memcpy_s(optim_input->addr, optim_input->size, reinterpret_cast<T *>(data) + offset, size);
MS_EXCEPTION_IF_NULL(optim_input);
void *dst_data = optim_input->addr;
T *src_data = reinterpret_cast<T *>(data) + offset;
MS_EXCEPTION_IF_NULL(dst_data);
MS_EXCEPTION_IF_NULL(src_data);
int ret = memcpy_s(optim_input->addr, optim_input->size, src_data, size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
@ -71,6 +77,7 @@ void OptimizerInfo::UpdateOptimInputValue(const std::string &optim_type, const s
}
void DenseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {
MS_EXCEPTION_IF_NULL(gradient()->addr);
float *accum_grad_data = reinterpret_cast<float *>(gradient()->addr);
size_t size = gradient()->size / sizeof(float);
size_t grad_index = this->grad_index();
@ -96,11 +103,19 @@ void DenseOptimInfo::ComputeMean(const std::vector<std::vector<size_t>> &, size_
}
}
void DenseOptimInfo::Reset() { memset_s(gradient()->addr, gradient()->size, 0x00, gradient()->size); }
void DenseOptimInfo::Reset() {
MS_EXCEPTION_IF_NULL(gradient()->addr);
int ret = memset_s(gradient()->addr, gradient()->size, 0x00, gradient()->size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memset_s error, errorno(" << ret << ")";
return;
}
}
void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {
// Append grad data to the end
float *accum_grad_data = reinterpret_cast<float *>(gradient()->addr);
MS_EXCEPTION_IF_NULL(accum_grad_data);
size_t grad_index = this->grad_index();
size_t grad_offset = 0;
@ -108,10 +123,16 @@ void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {
grad_offset += lengths[i];
}
float *incr_grad_data = values.data() + grad_offset;
MS_EXCEPTION_IF_NULL(incr_grad_data);
size_t incr_grad_size = lengths[grad_index] * sizeof(float);
size_t dst_size = incr_grad_size;
size_t src_size = incr_grad_size;
auto ret = memcpy_s(accum_grad_data + grads_offset_, dst_size, incr_grad_data, src_size);
void *dst_data = accum_grad_data + grads_offset_;
void *src_data = incr_grad_data;
MS_EXCEPTION_IF_NULL(dst_data);
MS_EXCEPTION_IF_NULL(src_data);
auto ret = memcpy_s(dst_data, dst_size, src_data, src_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
@ -121,6 +142,7 @@ void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {
// Append indice data to the end
int *accum_indices_data = reinterpret_cast<int *>(indices()->addr);
MS_EXCEPTION_IF_NULL(accum_indices_data);
size_t indices_index = this->indices_index();
size_t indice_offset = 0;
@ -128,12 +150,16 @@ void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {
indice_offset += lengths[i];
}
int *incr_indice_data = reinterpret_cast<int *>(values.data()) + indice_offset;
MS_EXCEPTION_IF_NULL(incr_indice_data);
size_t incr_indice_size = lengths[indices_index];
size_t incr_indice_data_size = incr_indice_size * sizeof(int);
dst_size = incr_indice_data_size;
src_size = incr_indice_data_size;
auto ret2 =
memcpy_s(accum_indices_data + indices_offset_, incr_indice_data_size, incr_indice_data, incr_indice_data_size);
dst_data = accum_indices_data + indices_offset_;
src_data = incr_indice_data;
MS_EXCEPTION_IF_NULL(dst_data);
MS_EXCEPTION_IF_NULL(src_data);
auto ret2 = memcpy_s(dst_data, dst_size, src_data, src_size);
if (ret2 != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret2 << ")";
return;
@ -167,6 +193,8 @@ void SparseOptimInfo::ComputeMean(const std::vector<std::vector<size_t>> &shapes
MS_LOG(ERROR) << "Invalid first dim size";
}
MS_EXCEPTION_IF_NULL(gradient()->addr);
MS_EXCEPTION_IF_NULL(indices()->addr);
float *grad_data = reinterpret_cast<float *>(gradient()->addr);
int *indices_data = reinterpret_cast<int *>(indices()->addr);
@ -189,12 +217,14 @@ void SparseOptimInfo::ComputeMean(const std::vector<std::vector<size_t>> &shapes
&unique_sparse_grad);
int reduced_grad_size = unique_sparse_grad.indices_size_ * segment_size * sizeof(float);
MS_EXCEPTION_IF_NULL(unique_sparse_grad.value_);
auto ret = memcpy_s(gradient()->addr, gradient()->size, unique_sparse_grad.value_, reduced_grad_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
int reduced_indice_size = unique_sparse_grad.indices_size_ * sizeof(int);
MS_EXCEPTION_IF_NULL(unique_sparse_grad.indices_);
ret = memcpy_s(indices()->addr, indices()->size, unique_sparse_grad.indices_, reduced_indice_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
@ -210,10 +240,8 @@ void SparseOptimInfo::ComputeMean(const std::vector<std::vector<size_t>> &shapes
}
void SparseOptimInfo::Reset() {
auto &gradient = this->gradient();
gradient->size = 0;
auto &indices = this->indices();
indices->size = 0;
gradient()->size = 0;
indices()->size = 0;
grads_offset_ = 0;
indices_offset_ = 0;
}

View File

@ -27,7 +27,10 @@ using mindspore::kernel::ps::SparseApplyFtrlPSKernel;
OptimizerInfo *OptimizerInfoBuilder::Build(const std::shared_ptr<PServerKernel> &pserver_kernel,
const WeightPtr &weight, const Keys &keys, const Values &values,
const Lengths &lens, const InputsShapePtr &inputs_shape, size_t worker_num) {
MS_EXCEPTION_IF_NULL(pserver_kernel);
MS_EXCEPTION_IF_NULL(inputs_shape);
OptimizerInfo *optim_info = BuildInputs(weight, keys, values, lens, inputs_shape, worker_num, pserver_kernel);
MS_EXCEPTION_IF_NULL(optim_info);
std::vector<size_t> ws_sizes = pserver_kernel->workspace_sizes();
BuildWorkspaces(optim_info, ws_sizes, worker_num);
BuildOutputs(optim_info, worker_num);
@ -39,7 +42,9 @@ void OptimizerInfoBuilder::BuildWorkspaces(OptimizerInfo *info, const std::vecto
for (size_t i = 0; i < ws_sizes.size(); i++) {
size_t size = ws_sizes[i];
AddressPtr workspace = std::make_shared<kernel::Address>();
MS_EXCEPTION_IF_NULL(workspace);
workspace->addr = new float[size];
MS_EXCEPTION_IF_NULL(workspace->addr);
workspace->size = size;
info->AddWorkspace(workspace);
}
@ -49,13 +54,11 @@ template <typename T>
AddressPtr OptimizerInfoBuilder::GenInputAddrPtr(const std::string &optim_type, const std::string &input_name,
void *ps_data, const Lengths &ps_lens,
const InputsShapePtr &inputs_shape) {
MS_EXCEPTION_IF_NULL(ps_data);
// Take note of that the data type maybe inconsistent in ps_data.
MS_LOG(INFO) << "Get input address pointer for optimizer:" << optim_type << ", input name:" << input_name;
AddressPtr addr_ptr = std::make_shared<kernel::Address>();
MS_EXCEPTION_IF_NULL(addr_ptr);
size_t addr_data_size = 0;
size_t addr_data_offset = 0;
size_t ps_index = INDEX_NOT_SEND;
if (kOptimToOriginIdx.count(optim_type) == 0 || kOptimToPSSendIdx.count(optim_type) == 0) {
MS_LOG(EXCEPTION) << "Optimizer type " << optim_type << " in not supported.";
@ -65,11 +68,12 @@ AddressPtr OptimizerInfoBuilder::GenInputAddrPtr(const std::string &optim_type,
if (ps_send_index_map.count(input_name) == 0 || origin_input_map.count(input_name) == 0) {
MS_LOG(EXCEPTION) << "Optimizer " << optim_type << " has no input for " << input_name;
}
ps_index = ps_send_index_map.at(input_name);
size_t ps_index = ps_send_index_map.at(input_name);
if (ps_index == INDEX_NOT_SEND) {
MS_LOG(EXCEPTION) << "Input " << input_name << " is not supposed to be sent to PS.";
}
size_t addr_data_size, addr_data_offset;
if (inputs_shape != nullptr) {
// addr_data_size should be calculated by inputs_shape if it's passed.
size_t origin_index = origin_input_map.at(input_name);
@ -86,7 +90,14 @@ AddressPtr OptimizerInfoBuilder::GenInputAddrPtr(const std::string &optim_type,
T *buffer = new T[addr_data_size];
addr_ptr->size = ps_lens[ps_index] * sizeof(T);
addr_ptr->addr = buffer;
int ret = memcpy_s(addr_ptr->addr, addr_ptr->size, reinterpret_cast<T *>(ps_data) + addr_data_offset, addr_ptr->size);
size_t dst_size = addr_ptr->size;
size_t src_size = addr_ptr->size;
void *dst_data = addr_ptr->addr;
void *src_data = reinterpret_cast<T *>(ps_data) + addr_data_offset;
MS_EXCEPTION_IF_NULL(dst_data);
MS_EXCEPTION_IF_NULL(src_data);
int ret = memcpy_s(dst_data, dst_size, src_data, src_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
delete[] buffer;
@ -99,11 +110,14 @@ OptimizerInfo *MomentumOptimInfoBuilder::BuildInputs(const WeightPtr &weight, co
const Lengths &lens, const InputsShapePtr &inputs_shape,
size_t worker_num, const std::shared_ptr<PServerKernel> &) {
AddressPtr weight_addr = std::make_shared<kernel::Address>();
MS_EXCEPTION_IF_NULL(weight_addr);
weight_addr->addr = weight->data();
weight_addr->size = weight->size() * sizeof(float);
AddressPtr accumulate = std::make_shared<kernel::Address>();
MS_EXCEPTION_IF_NULL(accumulate);
accumulate->addr = new float[weight->size()];
MS_EXCEPTION_IF_NULL(accumulate->addr);
accumulate->size = weight->size() * sizeof(float);
int ret = memset_s(accumulate->addr, accumulate->size, 0x00, accumulate->size);
if (ret != 0) {
@ -122,25 +136,30 @@ OptimizerInfo *SparseAdamOptimInfoBuilder::BuildInputs(const WeightPtr &weight,
const Lengths &lens, const InputsShapePtr &inputs_shape,
size_t worker_num, const std::shared_ptr<PServerKernel> &) {
AddressPtr weight_addr = std::make_shared<kernel::Address>();
MS_EXCEPTION_IF_NULL(weight_addr);
weight_addr->addr = weight->data();
weight_addr->size = weight->size() * sizeof(float);
AddressPtr m = std::make_shared<kernel::Address>();
MS_EXCEPTION_IF_NULL(m);
m->addr = new float[weight->size()];
MS_EXCEPTION_IF_NULL(m->addr);
m->size = weight->size() * sizeof(float);
int ret = memset_s(m->addr, m->size, 0x00, m->size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
MS_LOG(EXCEPTION) << "memset_s error, errorno(" << ret << ")";
delete[] reinterpret_cast<float *>(m->addr);
return nullptr;
}
AddressPtr v = std::make_shared<kernel::Address>();
MS_EXCEPTION_IF_NULL(v);
v->addr = new float[weight->size()];
MS_EXCEPTION_IF_NULL(v->addr);
v->size = weight->size() * sizeof(float);
ret = memset_s(v->addr, v->size, 0x00, v->size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
MS_LOG(EXCEPTION) << "memset_s error, errorno(" << ret << ")";
delete[] reinterpret_cast<float *>(v->addr);
delete[] reinterpret_cast<float *>(m->addr);
return nullptr;
@ -154,7 +173,6 @@ OptimizerInfo *SparseAdamOptimInfoBuilder::BuildInputs(const WeightPtr &weight,
AddressPtr epsilon = GenInputAddrPtr<float>(kSparseAdam, "eps", values.data(), lens);
AddressPtr grad = GenInputAddrPtr<float>(kSparseAdam, "grad", values.data(), lens, inputs_shape);
AddressPtr indices = GenInputAddrPtr<float>(kSparseAdam, "indices", values.data(), lens, inputs_shape);
return new SparseAdamOptimInfo(weight_addr, m, v, beta1_power, beta2_power, learning_rate, beta1, beta2, epsilon,
grad, indices);
}
@ -163,12 +181,16 @@ OptimizerInfo *SparseFtrlOptimInfoBuilder::BuildInputs(const WeightPtr &weight,
const Lengths &lens, const InputsShapePtr &inputs_shape,
size_t worker_num,
const std::shared_ptr<PServerKernel> &pserver_kernel) {
MS_EXCEPTION_IF_NULL(inputs_shape);
AddressPtr weight_addr = std::make_shared<kernel::Address>();
MS_EXCEPTION_IF_NULL(weight_addr);
weight_addr->addr = weight->data();
weight_addr->size = weight->size() * sizeof(float);
AddressPtr accum = std::make_shared<kernel::Address>();
MS_EXCEPTION_IF_NULL(accum);
accum->addr = new float[weight->size()];
MS_EXCEPTION_IF_NULL(accum->addr);
accum->size = weight->size() * sizeof(float);
for (size_t i = 0; i < weight->size(); i++) {
float *tmp = reinterpret_cast<float *>(accum->addr);
@ -176,7 +198,9 @@ OptimizerInfo *SparseFtrlOptimInfoBuilder::BuildInputs(const WeightPtr &weight,
}
AddressPtr linear = std::make_shared<kernel::Address>();
MS_EXCEPTION_IF_NULL(linear);
linear->addr = new float[weight->size()];
MS_EXCEPTION_IF_NULL(linear->addr);
int ret = memset_s(linear->addr, weight->size() * sizeof(float), 0x00, weight->size() * sizeof(float));
if (ret != 0) {
MS_LOG(EXCEPTION) << "memset_s error, errorno(" << ret << ")";
@ -187,7 +211,6 @@ OptimizerInfo *SparseFtrlOptimInfoBuilder::BuildInputs(const WeightPtr &weight,
AddressPtr grad = GenInputAddrPtr<float>(kSparseFtrl, "grad", values.data(), lens, inputs_shape);
AddressPtr indices = GenInputAddrPtr<float>(kSparseFtrl, "indices", values.data(), lens, inputs_shape);
return new SparseFtrlOptimInfo(weight_addr, accum, linear, grad, indices);
}
} // namespace ps

View File

@ -172,6 +172,7 @@ class FuncGraph;
template <typename T>
void ParameterServer<T>::ServerHandler::operator()(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data,
::ps::KVServer<T> *server) {
MS_EXCEPTION_IF_NULL(server);
::ps::KVPairs<T> res;
if (handlers_.count(req_meta.cmd) > 0) {
auto &handler_ptr = handlers_[req_meta.cmd];
@ -199,12 +200,14 @@ void ParameterServer<T>::ServerHandler::Init() {
template <typename T>
void ParameterServer<T>::ServerHandler::HandlePushReq(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data,
::ps::KVPairs<T> *res) {
MS_EXCEPTION_IF_NULL(res);
ps_->AccumGrad(req_data.keys, req_data.vals, req_data.lens);
}
template <typename T>
void ParameterServer<T>::ServerHandler::HandlePullReq(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data,
::ps::KVPairs<T> *res) {
MS_EXCEPTION_IF_NULL(res);
res->keys = req_data.keys;
::ps::Key key = req_data.keys[0];
res->vals = *(ps_->weight(key));
@ -214,6 +217,7 @@ template <typename T>
void ParameterServer<T>::ServerHandler::HandleInitWeights(const ::ps::KVMeta &req_meta,
const ::ps::KVPairs<T> &req_data, ::ps::KVPairs<T> *res) {
std::unique_lock<std::mutex> lock(ps_->mutex());
MS_EXCEPTION_IF_NULL(res);
size_t key_num = req_data.keys.size();
T *data_ptr = req_data.vals.data();
size_t pos = 0;
@ -223,10 +227,12 @@ void ParameterServer<T>::ServerHandler::HandleInitWeights(const ::ps::KVMeta &re
if (!ps_->HasWeight(key)) {
WeightPtr weight_ptr = std::make_shared<::ps::SArray<T>>();
MS_EXCEPTION_IF_NULL(weight_ptr);
weight_ptr->CopyFrom(data_ptr + pos, data_len);
ps_->InitWeight(key, weight_ptr);
GradPtr grad_ptr = std::make_shared<::ps::SArray<T>>(data_len, 0);
MS_EXCEPTION_IF_NULL(grad_ptr);
ps_->InitGrad(key, grad_ptr);
}
pos += data_len;
@ -238,6 +244,7 @@ void ParameterServer<T>::ServerHandler::HandleInitWeightToOptimId(const ::ps::KV
const ::ps::KVPairs<T> &req_data,
::ps::KVPairs<T> *res) {
std::unique_lock<std::mutex> lock(ps_->mutex());
MS_EXCEPTION_IF_NULL(res);
size_t key_num = req_data.keys.size();
for (size_t i = 0; i < key_num; i++) {
Key key = req_data.keys[i];
@ -255,6 +262,7 @@ template <typename T>
void ParameterServer<T>::ServerHandler::HandleInitInputsShape(const ::ps::KVMeta &req_meta,
const ::ps::KVPairs<T> &req_data, ::ps::KVPairs<T> *res) {
std::unique_lock<std::mutex> lock(ps_->mutex());
MS_EXCEPTION_IF_NULL(res);
const Key &key = req_data.keys[0];
if (init_optim_info_[key]) {
return;
@ -268,13 +276,18 @@ template <typename T>
void ParameterServer<T>::ServerHandler::HandleInitEmbeddings(const ::ps::KVMeta &req_meta,
const ::ps::KVPairs<T> &req_data, ::ps::KVPairs<T> *res) {
std::unique_lock<std::mutex> lock(ps_->mutex());
MS_EXCEPTION_IF_NULL(res);
const Key &key = req_data.keys[0];
MS_LOG(INFO) << "Initializing embedding table for key:" << key;
std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> shapes =
std::make_shared<std::vector<std::shared_ptr<std::vector<size_t>>>>();
MS_EXCEPTION_IF_NULL(shapes);
std::shared_ptr<std::vector<size_t>> input_shape = std::make_shared<std::vector<size_t>>();
MS_EXCEPTION_IF_NULL(input_shape);
std::shared_ptr<std::vector<size_t>> indices_shape = std::make_shared<std::vector<size_t>>();
MS_EXCEPTION_IF_NULL(indices_shape);
std::shared_ptr<std::vector<size_t>> output_shape = std::make_shared<std::vector<size_t>>();
MS_EXCEPTION_IF_NULL(output_shape);
shapes->push_back(input_shape);
shapes->push_back(indices_shape);
shapes->push_back(output_shape);
@ -297,6 +310,7 @@ template <typename T>
void ParameterServer<T>::ServerHandler::HandleCheckReadyForPush(const ::ps::KVMeta &req_meta,
const ::ps::KVPairs<T> &req_data,
::ps::KVPairs<T> *res) {
MS_EXCEPTION_IF_NULL(res);
const Key &key = req_data.keys[0];
bool ready = ps_->ReadyForPush(key);
res->keys.push_back(key);
@ -307,6 +321,7 @@ template <typename T>
void ParameterServer<T>::ServerHandler::HandleCheckReadyForPull(const ::ps::KVMeta &req_meta,
const ::ps::KVPairs<T> &req_data,
::ps::KVPairs<T> *res) {
MS_EXCEPTION_IF_NULL(res);
const Key &key = req_data.keys[0];
bool ready = ps_->ReadyForPull(key);
res->keys.push_back(key);
@ -316,6 +331,7 @@ void ParameterServer<T>::ServerHandler::HandleCheckReadyForPull(const ::ps::KVMe
template <typename T>
void ParameterServer<T>::ServerHandler::HandleEmbeddingLookup(const ::ps::KVMeta &req_meta,
const ::ps::KVPairs<T> &req_data, ::ps::KVPairs<T> *res) {
MS_EXCEPTION_IF_NULL(res);
const Key &key = req_data.keys[0];
for (size_t i = 1; i < req_data.keys.size(); i++) {
res->keys.push_back(req_data.keys[i]);
@ -326,6 +342,7 @@ void ParameterServer<T>::ServerHandler::HandleEmbeddingLookup(const ::ps::KVMeta
template <typename T>
void ParameterServer<T>::ServerHandler::HandleFinalize(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data,
::ps::KVPairs<T> *res) {
MS_EXCEPTION_IF_NULL(res);
ps_->Finalize();
}
@ -371,7 +388,9 @@ void ParameterServer<T>::InitWeightKeyToOptims(const Key &key, const int &optim_
template <typename T>
void ParameterServer<T>::InitOptimInputsShape(const Keys &keys, const Values &values, const Lengths &lengths) {
InputsShapePtr inputs_shape = std::make_shared<InputsShape>();
MS_EXCEPTION_IF_NULL(inputs_shape);
InputsShapePtr original_inputs_shape = std::make_shared<InputsShape>();
MS_EXCEPTION_IF_NULL(original_inputs_shape);
int val_idx = 0;
const Key &key = keys[0];
MS_LOG(INFO) << "Initializing optimizer inputs shape for key:" << key;
@ -381,7 +400,9 @@ void ParameterServer<T>::InitOptimInputsShape(const Keys &keys, const Values &va
}
for (size_t i = 0; i < keys.size(); i++) {
auto shape = std::make_shared<std::vector<size_t>>();
MS_EXCEPTION_IF_NULL(shape);
auto original_shape = std::make_shared<std::vector<size_t>>();
MS_EXCEPTION_IF_NULL(original_shape);
inputs_shape->push_back(shape);
original_inputs_shape->push_back(original_shape);
@ -425,6 +446,7 @@ template <typename T>
const CNodePtr ParameterServer<T>::GetCNode(const std::string &name) const {
std::list<CNodePtr> cnodes = func_graph_->GetOrderedCnodes();
for (CNodePtr cnode : cnodes) {
MS_EXCEPTION_IF_NULL(cnode);
std::string fullname = cnode->fullname_with_scope();
if (fullname.find(name) != std::string::npos && fullname.find("Push") != std::string::npos) {
return cnode;
@ -435,6 +457,7 @@ const CNodePtr ParameterServer<T>::GetCNode(const std::string &name) const {
template <typename T>
void ParameterServer<T>::InitWeight(const Key &key, const WeightPtr &weight) {
MS_EXCEPTION_IF_NULL(weight);
if ((weights_.count(key) == 0) || (is_embedding_[key] && weights_.count(key) != 0)) {
MS_LOG(INFO) << "Initializing weight for key " << key << ", server rank " << rank_id_;
weights_[key] = weight;
@ -445,6 +468,7 @@ void ParameterServer<T>::InitWeight(const Key &key, const WeightPtr &weight) {
template <typename T>
void ParameterServer<T>::InitGrad(const Key &key, const GradPtr &grad) {
MS_EXCEPTION_IF_NULL(grad);
if (grads_.count(key) == 0) {
grads_[key] = grad;
grads_accum_counter_[key] = 0;
@ -454,6 +478,7 @@ void ParameterServer<T>::InitGrad(const Key &key, const GradPtr &grad) {
template <typename T>
void ParameterServer<T>::InitEmbeddingTable(
const Key &key, const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &shapes) {
MS_EXCEPTION_IF_NULL(shapes);
if (weights_.count(key) == 0) {
std::shared_ptr<PServerKernel> lookup =
std::make_shared<kernel::ps::EmbeddingLookUpPSKernel>(rank_id_, pserver_num_, worker_num_);
@ -464,6 +489,7 @@ void ParameterServer<T>::InitEmbeddingTable(
const std::vector<size_t> &input_shapes = lookup->input_sizes();
size_t total_dims = std::accumulate(input_shapes.begin(), input_shapes.end(), 1, std::multiplies<size_t>());
WeightPtr embedding = std::make_shared<Weight>(total_dims, 0);
MS_EXCEPTION_IF_NULL(embedding);
T *embedding_data = embedding->data();
std::default_random_engine engine;
std::normal_distribution<float> random(0, 0.01);
@ -580,7 +606,9 @@ WeightPtr ParameterServer<T>::weight(const Key &key) {
MS_LOG(EXCEPTION) << "Invalid weight key " << key;
}
WeightPtr weight_ptr = weights_[key];
MS_EXCEPTION_IF_NULL(weight_ptr);
WeightPtr copy_weight_ptr = std::make_shared<::ps::SArray<T>>(weight_ptr->size(), 0);
MS_EXCEPTION_IF_NULL(copy_weight_ptr);
copy_weight_ptr->CopyFrom(weight_ptr->data(), weight_ptr->size());
tokens_[key] -= 1;
return copy_weight_ptr;
@ -589,6 +617,7 @@ WeightPtr ParameterServer<T>::weight(const Key &key) {
template <typename T>
void ParameterServer<T>::DoEmbeddingLookup(Key key, const LookupIds &lookup_ids, ::ps::KVPairs<T> *res) {
std::unique_lock<std::mutex> lock(mutex_);
MS_EXCEPTION_IF_NULL(res);
if (weights_.count(key) == 0) {
MS_LOG(ERROR) << "Invalid embedding table key " << key;
return;
@ -598,7 +627,9 @@ void ParameterServer<T>::DoEmbeddingLookup(Key key, const LookupIds &lookup_ids,
return;
}
WeightPtr table_ptr = weights_[key];
MS_EXCEPTION_IF_NULL(table_ptr);
std::shared_ptr<PServerKernel> table_lookup_op = embedding_lookup_ops_[key];
MS_EXCEPTION_IF_NULL(table_lookup_op);
// Update shapes of lookup operator
std::vector<std::vector<size_t>> shapes = {};
@ -610,13 +641,16 @@ void ParameterServer<T>::DoEmbeddingLookup(Key key, const LookupIds &lookup_ids,
const std::vector<size_t> output_shapes = table_lookup_op->output_sizes();
std::vector<kernel::AddressPtr> inputs;
AddressPtr embedding_table = std::make_shared<kernel::Address>();
MS_EXCEPTION_IF_NULL(embedding_table);
AddressPtr indices = std::make_shared<kernel::Address>();
MS_EXCEPTION_IF_NULL(indices);
inputs.push_back(embedding_table);
inputs.push_back(indices);
embedding_table->addr = table_ptr->data();
embedding_table->size = table_ptr->size() * sizeof(T);
std::unique_ptr<int[]> tmp_ids(new int[lookup_ids.size()]);
MS_EXCEPTION_IF_NULL(tmp_ids);
for (size_t i = 0; i < lookup_ids.size(); i++) {
tmp_ids[i] = static_cast<int>(lookup_ids[i]);
}
@ -626,7 +660,9 @@ void ParameterServer<T>::DoEmbeddingLookup(Key key, const LookupIds &lookup_ids,
std::vector<kernel::AddressPtr> workspaces;
std::vector<kernel::AddressPtr> outputs;
AddressPtr output = std::make_shared<kernel::Address>();
MS_EXCEPTION_IF_NULL(output);
std::shared_ptr<Values> addr = std::make_shared<Values>(output_shapes[0] / sizeof(T), 0);
MS_EXCEPTION_IF_NULL(addr);
output->addr = addr->data();
output->size = output_shapes[0];
@ -680,6 +716,7 @@ void ParameterServer<T>::GetEmbeddingTableParamPtr() {
auto cnodes = func_graph_->GetOrderedCnodes();
Key count = 0;
for (auto cnode : cnodes) {
MS_EXCEPTION_IF_NULL(cnode);
std::string cnode_name = AnfAlgo::GetCNodeName(cnode);
if (cnode_name == kEmbeddingLookupOpName) {
auto embedding_table = AnfAlgo::GetInputNode(cnode, 0);
@ -703,6 +740,7 @@ void ParameterServer<T>::SyncEmbeddingTables() {
std::vector<int> new_tensor_shape(input_shapes.begin(), input_shapes.end());
tensor::TensorPtr new_tensor = std::make_shared<tensor::Tensor>(kNumberTypeFloat32, new_tensor_shape);
MS_EXCEPTION_IF_NULL(new_tensor);
float *new_tensor_data_ptr = reinterpret_cast<float *>(new_tensor->data_c());
size_t new_tensor_size = static_cast<size_t>(new_tensor->data().nbytes());
size_t embedding_table_size = weights_[key]->size() * sizeof(float);
@ -710,6 +748,8 @@ void ParameterServer<T>::SyncEmbeddingTables() {
MS_LOG(EXCEPTION) << "Shape of embedding table can't match. New tensor size:" << new_tensor_size
<< ", embedding_table size:" << embedding_table_size;
}
MS_EXCEPTION_IF_NULL(new_tensor_data_ptr);
MS_EXCEPTION_IF_NULL(weights_[key]->data());
int ret = memcpy_s(new_tensor_data_ptr, new_tensor_size, weights_[key]->data(), embedding_table_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
@ -724,6 +764,7 @@ void ParameterServer<T>::SyncEmbeddingTables() {
template <typename T>
void ParameterServer<T>::Run(const FuncGraphPtr &func_graph) {
MS_EXCEPTION_IF_NULL(func_graph);
MS_LOG(INFO) << "PServer starts connecting to scheduler and workers...";
::ps::Start(0);
MS_LOG(INFO) << "PServer connected successfully.";

View File

@ -53,7 +53,7 @@ class Worker {
void SetOptimInputShapes(size_t key, const ShapeVector &shape);
void AddEmbeddingTable(const ::ps::Key &key, const size_t &row_count);
void InitPSEmbeddingTable(const std::vector<size_t> &keys, std::vector<size_t> shapes, const ShapeVector &sizes);
void InitPSParamAndOptim(const std::string &param_name, tensor::TensorPtr tensor);
void InitPSParamAndOptim(const std::string &param_name, const tensor::TensorPtr &tensor);
void DoPSEmbeddingLookup(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray<int> &lookup_ids,
const ::ps::SArray<int> &lens, ::ps::SArray<T> *lookup_result, int cmd);
void Finalize();
@ -132,10 +132,13 @@ void Worker<T>::Push(const std::vector<size_t> &keys, std::vector<uintptr_t> add
size_t dst_size = 0;
size_t src_size = 0;
for (size_t i = 0; i < sizes.size(); i++) {
void *dst_data = total_buffer.data() + offset / sizeof(T);
void *src_data = reinterpret_cast<void *>(addrs[i]);
MS_EXCEPTION_IF_NULL(dst_data);
MS_EXCEPTION_IF_NULL(src_data);
dst_size = sizes[i] * sizeof(T);
src_size = sizes[i] * sizeof(T);
auto ret =
memcpy_s(total_buffer.data() + offset / sizeof(T), dst_size, reinterpret_cast<void *>(addrs[i]), src_size);
auto ret = memcpy_s(dst_data, dst_size, src_data, src_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
@ -159,6 +162,7 @@ void Worker<T>::Push(const std::vector<size_t> &keys, std::vector<uintptr_t> add
template <typename T>
void Worker<T>::Pull(const size_t key, void *dev_addr, const size_t size) {
MS_EXCEPTION_IF_NULL(dev_addr);
::ps::SArray<T> variables(size / sizeof(T), 0);
while (!kv_worker_->IsReadyForPull(key)) {
continue;
@ -176,6 +180,7 @@ void Worker<T>::Pull(const size_t key, void *dev_addr, const size_t size) {
template <typename T>
void Worker<T>::DoPSEmbeddingLookup(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray<int> &lookup_ids,
const ::ps::SArray<int> &lens, ::ps::SArray<T> *lookup_result, int cmd) {
MS_EXCEPTION_IF_NULL(lookup_result);
kv_worker_->EmbeddingLookup(keys, lookup_ids, lens, lookup_result, cmd);
}
@ -192,6 +197,7 @@ void Worker<T>::Finalize() {
template <typename T>
void Worker<T>::InitPSParamData(const std::vector<size_t> &keys, void *origin_addr, size_t size) {
MS_EXCEPTION_IF_NULL(origin_addr);
::ps::SArray<T> addr(reinterpret_cast<T *>(origin_addr), size / sizeof(T));
::ps::SArray<::ps::Key> key(keys);
::ps::SArray<int> lens;
@ -316,7 +322,8 @@ void Worker<T>::InitPSEmbeddingTable(const std::vector<size_t> &keys, std::vecto
}
template <typename T>
void Worker<T>::InitPSParamAndOptim(const std::string &param_name, tensor::TensorPtr tensor) {
void Worker<T>::InitPSParamAndOptim(const std::string &param_name, const tensor::TensorPtr &tensor) {
MS_EXCEPTION_IF_NULL(tensor);
void *param_data = tensor->data_c();
size_t param_size = LongToSize(tensor->data().nbytes());
ShapeVector param_shape = tensor->shape_c();

View File

@ -144,6 +144,7 @@ void WorkerProxy<T>::AddEmbeddingTable(const ::ps::Key &key, const size_t &row_c
::ps::Range range(begin, end);
if (embedding_table_ranges_.count(key) == 0) {
embedding_table_ranges_[key] = std::make_shared<std::vector<::ps::Range>>();
MS_EXCEPTION_IF_NULL(embedding_table_ranges_[key]);
}
embedding_table_ranges_[key]->push_back(range);
}
@ -168,6 +169,7 @@ template <typename T>
void WorkerProxy<T>::EmbeddingLookup(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray<int> &lookup_ids,
const ::ps::SArray<int> &lens, ::ps::SArray<T> *outs, int cmd, const Callback &cb,
int priority) {
MS_EXCEPTION_IF_NULL(outs);
int ts = AddLookupCB(keys, lookup_ids, outs, cmd, cb);
::ps::KVPairs<T> kvs;
kvs.keys = keys;
@ -265,6 +267,7 @@ void WorkerProxy<T>::PushSparseData(const ::ps::SArray<::ps::Key> &keys, const :
template <typename T>
void WorkerProxy<T>::PullData(const ::ps::SArray<::ps::Key> &keys, ::ps::SArray<T> *vals, ::ps::SArray<int> *lens,
int cmd, int priority) {
MS_EXCEPTION_IF_NULL(vals);
int ts = AddGeneralRspCB(keys, vals, lens, cmd, nullptr);
::ps::KVPairs<T> kvs;
kvs.keys = keys;
@ -295,6 +298,7 @@ template <typename T>
template <typename C>
int WorkerProxy<T>::AddLookupCB(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray<int> &lookup_ids,
C *lookup_result, int cmd, const Callback &cb) {
MS_EXCEPTION_IF_NULL(lookup_result);
int ts = lookup_customer_->NewRequest(::ps::kServerGroup);
const auto &callback = [this, ts, keys, lookup_ids, lookup_result, cb]() mutable {
mutex_.lock();
@ -310,17 +314,27 @@ int WorkerProxy<T>::AddLookupCB(const ::ps::SArray<::ps::Key> &keys, const ::ps:
T *addr = s.vals.data() + offset;
offset += len;
id_addr_map[key] = std::make_shared<std::pair<T *, int>>(std::make_pair(addr, len));
MS_EXCEPTION_IF_NULL(id_addr_map[key]);
}
}
T *result_addr = lookup_result->data();
MS_EXCEPTION_IF_NULL(result_addr);
int offset = 0;
size_t dst_size = 0;
size_t src_size = 0;
void *dst_data = nullptr;
void *src_data = nullptr;
for (size_t i = 0; i < lookup_ids.size(); i++) {
auto &pair = id_addr_map[static_cast<Key>(lookup_ids[i])];
int size = pair->second * sizeof(T);
size_t dst_size = size;
size_t src_size = size;
auto ret = memcpy_s(result_addr + offset, dst_size, pair->first, src_size);
dst_size = size;
src_size = size;
dst_data = result_addr + offset;
src_data = pair->first;
MS_EXCEPTION_IF_NULL(dst_data);
MS_EXCEPTION_IF_NULL(src_data);
auto ret = memcpy_s(dst_data, dst_size, src_data, src_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
@ -373,6 +387,7 @@ template <typename T>
void WorkerProxy<T>::LookupIdSlicer(int timestamp, const ::ps::KVPairs<T> &send, const std::vector<::ps::Range> &,
std::vector<std::pair<bool, ::ps::KVPairs<T>>> *sliced,
const std::map<int, int> &attrs) {
MS_EXCEPTION_IF_NULL(sliced);
int *lookup_ids = send.lens.data();
size_t id_size = send.lens.size();
@ -414,6 +429,7 @@ template <typename T>
void WorkerProxy<T>::SparseSlicer(int timestamp, const ::ps::KVPairs<T> &send, const std::vector<::ps::Range> &,
std::vector<std::pair<bool, ::ps::KVPairs<T>>> *sliced,
const std::map<int, int> &attrs) {
MS_EXCEPTION_IF_NULL(sliced);
// Init variables
T *data = send.vals.data();
@ -527,15 +543,29 @@ void WorkerProxy<T>::PrepareSparseGradient(const size_t begin, const size_t end,
const std::vector<std::pair<int, T *>> &indice_to_grads,
const int *all_indice, const size_t segment_size, T *gradient,
int *indices) {
MS_EXCEPTION_IF_NULL(all_indice);
MS_EXCEPTION_IF_NULL(gradient);
MS_EXCEPTION_IF_NULL(indices);
int offset = 0;
int index = 0;
size_t segment_data_size = segment_size * sizeof(T);
size_t dst_size = 0;
size_t src_size = 0;
void *dst_data = nullptr;
void *src_data = nullptr;
for (auto &pair : indice_to_grads) {
if (distinct_ids.count(pair.first) == 0) {
continue;
}
indices[index++] = pair.first;
auto ret = memcpy_s(gradient + offset, segment_data_size, pair.second, segment_data_size);
dst_size = segment_data_size;
src_size = segment_data_size;
dst_data = gradient + offset;
src_data = pair.second;
MS_EXCEPTION_IF_NULL(dst_data);
MS_EXCEPTION_IF_NULL(src_data);
auto ret = memcpy_s(gradient + offset, dst_size, pair.second, src_size);
if (ret != 0) {
MS_LOG(ERROR) << "memcpy_s error, errorno(" << ret << ")";
return;
@ -548,15 +578,25 @@ template <typename T>
void WorkerProxy<T>::BuildSparseValue(const ::ps::SArray<int> &lengths, const size_t grad_index,
const size_t indice_index, const T *original_data, const T *grads, int *indices,
::ps::SArray<T> *reduced_data) {
MS_EXCEPTION_IF_NULL(original_data);
MS_EXCEPTION_IF_NULL(grads);
MS_EXCEPTION_IF_NULL(indices);
MS_EXCEPTION_IF_NULL(reduced_data);
int offset = 0;
size_t dst_size = 0;
size_t src_size = 0;
void *dst_data = nullptr;
void *src_data = nullptr;
for (size_t i = 0; i < lengths.size(); i++) {
if (i != grad_index && i != indice_index) {
int data_size = lengths[i] * sizeof(T);
dst_size = data_size;
src_size = data_size;
auto ret = memcpy_s(reduced_data->data() + offset, dst_size, original_data + offset, src_size);
dst_data = reduced_data->data() + offset;
src_data = const_cast<T *>(original_data) + offset;
MS_EXCEPTION_IF_NULL(dst_data);
MS_EXCEPTION_IF_NULL(src_data);
auto ret = memcpy_s(dst_data, dst_size, src_data, src_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
@ -573,7 +613,11 @@ void WorkerProxy<T>::BuildSparseValue(const ::ps::SArray<int> &lengths, const si
int data_size = lengths[grad_index] * sizeof(T);
dst_size = data_size;
src_size = data_size;
auto ret = memcpy_s(reduced_data->data() + grad_offset, dst_size, grads, src_size);
dst_data = reduced_data->data() + grad_offset;
src_data = const_cast<T *>(grads);
MS_EXCEPTION_IF_NULL(dst_data);
MS_EXCEPTION_IF_NULL(src_data);
auto ret = memcpy_s(dst_data, dst_size, src_data, src_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
@ -585,7 +629,11 @@ void WorkerProxy<T>::BuildSparseValue(const ::ps::SArray<int> &lengths, const si
T *indice_data = reduced_data->data() + indice_offset;
dst_size = data_size;
src_size = data_size;
ret = memcpy_s(indice_data, dst_size, indices, src_size);
dst_data = indice_data;
src_data = indices;
MS_EXCEPTION_IF_NULL(dst_data);
MS_EXCEPTION_IF_NULL(src_data);
ret = memcpy_s(dst_data, dst_size, src_data, src_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
@ -596,6 +644,7 @@ template <typename T>
void WorkerProxy<T>::BroadcastSlicer(int timestamp, const ::ps::KVPairs<T> &send, const std::vector<::ps::Range> &,
std::vector<std::pair<bool, ::ps::KVPairs<T>>> *sliced,
const std::map<int, int> &attr) {
MS_EXCEPTION_IF_NULL(sliced);
sliced->resize(server_num_);
for (int i = 0; i < server_num_; i++) {
sliced->at(i).first = true;
@ -608,6 +657,7 @@ template <typename T>
void WorkerProxy<T>::RoundRobinSlicer(int timestamp, const ::ps::KVPairs<T> &send, const std::vector<::ps::Range> &,
std::vector<std::pair<bool, ::ps::KVPairs<T>>> *sliced,
const std::map<int, int> &attr) {
MS_EXCEPTION_IF_NULL(sliced);
sliced->resize(server_num_);
auto keys = send.keys;
auto vals = send.vals;
@ -646,6 +696,7 @@ void WorkerProxy<T>::WorkerInitEmbeddingSlicer(int timestamp, const ::ps::KVPair
const std::vector<::ps::Range> &,
std::vector<std::pair<bool, ::ps::KVPairs<T>>> *sliced,
const std::map<int, int> &attrs) {
MS_EXCEPTION_IF_NULL(sliced);
sliced->resize(server_num_);
auto keys = send.keys;
auto vals = send.vals;
@ -714,6 +765,7 @@ void WorkerProxy<T>::ProcessResponse(const ::ps::Message &msg) {
template <typename T>
void WorkerProxy<T>::Send(::ps::Customer *customer, int timestamp, bool push, bool pull, int cmd,
const ::ps::KVPairs<T> &kvs, const Slicer &slicer, std::map<int, int> attrs) {
MS_EXCEPTION_IF_NULL(customer);
SlicedKVs sliced;
slicer(timestamp, kvs, ::ps::Postoffice::Get()->GetServerKeyRanges(), &sliced, attrs);

View File

@ -639,14 +639,17 @@ def set_ps_context(**kwargs):
Note:
Some other environment variables should also be set for parameter server training mode.
These environment variables are listed below:
.. code-block::
MS_SERVER_NUM # Server number
MS_WORKER_NUM # Worker number
MS_SCHED_HOST # Scheduler IP address
MS_SCHED_PORT # Scheduler port
MS_ROLE # The role of this process:
MS_SCHED represents the scheduler,
MS_WORKER represents the worker,
MS_PSERVER represents the Server
# MS_SCHED represents the scheduler,
# MS_WORKER represents the worker,
# MS_PSERVER represents the Server
Args: