fixed heartbeat timeout

This commit is contained in:
chendongsheng 2021-07-09 17:09:21 +08:00
parent b02dbcffaf
commit 060ca81c91
2 changed files with 6 additions and 6 deletions

View File

@ -347,12 +347,11 @@ std::pair<uint32_t, uint64_t> AbstractNode::CollectiveReceiveAsync(const enum No
auto res = received_data_[std::make_pair(rank_id, rank_request_id)];
if (*output != nullptr) {
MS_LOG(WARNING) << "The output is not empty.";
} else {
*output = res;
received_data_.erase(std::make_pair(rank_id, rank_request_id));
receive_messages_done_[std::make_pair(rank_id, rank_request_id)] = true;
MS_LOG(DEBUG) << "Receive data from rank id:" << rank_id << ", the rank request id is:" << rank_request_id;
}
*output = res;
received_data_.erase(std::make_pair(rank_id, rank_request_id));
receive_messages_done_[std::make_pair(rank_id, rank_request_id)] = true;
MS_LOG(DEBUG) << "Receive data from rank id:" << rank_id << ", the rank request id is:" << rank_request_id;
};
}
receive_callbacks_mutex_.unlock();
@ -363,6 +362,7 @@ bool AbstractNode::CollectiveWait(std::pair<uint32_t, uint64_t> request_id, cons
std::unique_lock<std::mutex> lock(receive_callbacks_mutex_);
bool res =
receive_cond_.wait_for(lock, std::chrono::seconds(timeout), [&] { return receive_messages_done_[request_id]; });
receive_messages_done_[request_id] = true;
return res;
}

View File

@ -70,7 +70,7 @@ class Node {
bool Wait(uint64_t request_id, const uint32_t &timeout = kCommTimeoutInSeconds);
bool SendMessageSync(const std::shared_ptr<TcpClient> &client, std::shared_ptr<MessageMeta>, const Protos &,
const void *, size_t size, const uint32_t &timeout = kTimeoutInSeconds);
const void *, size_t size, const uint32_t &timeout = kCommTimeoutInSeconds);
protected:
bool WaitForStart(const uint32_t &timeout);