!48404 bugfix for ps server can not exit

Merge pull request !48404 from zyli2020/bug_fix
This commit is contained in:
i-robot 2023-02-05 08:45:39 +00:00 committed by Gitee
commit 7cddb2c437
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
5 changed files with 29 additions and 5 deletions

View File

@ -169,7 +169,7 @@ void EmbeddingCachePrefetchActor::Initialize() {
initialized_ = true;
}
void EmbeddingCachePrefetchActor::Finalize() {
void EmbeddingCachePrefetchActor::Finalize(bool finalize_remote) {
std::lock_guard<std::mutex> lock(finalize_mutex_);
if (!initialized_ || finalized_) {
return;
@ -178,7 +178,9 @@ void EmbeddingCachePrefetchActor::Finalize() {
running_ = false;
PsDataPrefetch::GetInstance().NotifyFinalize();
(void)FinalizeRemote();
if (finalize_remote) {
(void)FinalizeRemote();
}
data_parser_.notify_all();

View File

@ -99,7 +99,7 @@ class EmbeddingCachePrefetchActor : public ActorBase {
void SyncEmbeddingTable();
// Finalize embedding cache prefetch actor and push latest embedding from local cache to remote cache.
void Finalize();
void Finalize(bool finalize_remote);
// Wait the computed graph finish current step when there is not enough free memory space in the cache, in order to
// delete the feature vector used by the current step from the cache.

View File

@ -89,13 +89,28 @@ void MuxRecvActor::ParseFinalizeReqData(size_t data_len, const MessageBase *cons
}
}
void MuxRecvActor::Clear() {
Finalize();
RecvActor::Clear();
}
void MuxRecvActor::Finalize() {
std::unique_lock<std::mutex> lock(context_mtx_);
if (finalized_) {
return;
}
finalized_ = true;
is_context_valid_ = true;
op_context_ = nullptr;
context_cv_.notify_all();
}
void MuxRecvActor::StopRpcAtException() {
std::unique_lock<std::mutex> lock(context_mtx_);
is_exception_thrown_ = true;
context_cv_.notify_all();
}
} // namespace runtime
} // namespace mindspore

View File

@ -41,9 +41,15 @@ class MuxRecvActor : public RecvActor {
// Get the from actor aid of received message.
const AID &from_actor_aid() const { return from_actor_aid_; }
// Finalize mux recv actor gracefully.
// Clear resource of mux recv actor.
void Clear() override;
// Stop mux recv actor gracefully.
void Finalize() override;
// Stop rpc communication to avoid dead lock after exception is thrown.
void StopRpcAtException() override;
private:
// Set the message handler of the server.
void SetMessageHandler() override;

View File

@ -453,7 +453,8 @@ void EmbeddingCacheScheduler::Finalize(bool sync_embedding_table) {
MS_EXCEPTION_IF_NULL(embedding_cache_prefetch_actor_);
// Stop the embedding cache prefetch_actor.
embedding_cache_prefetch_actor_->Finalize();
bool finalize_remote = sync_embedding_table;
embedding_cache_prefetch_actor_->Finalize(finalize_remote);
embedding_cache_table_manager.Finalize();