forked from mindspore-Ecosystem/mindspore
!31526 clean code check warnings
Merge pull request !31526 from zyli2020/master
This commit is contained in:
commit
040b570c01
|
@ -143,7 +143,7 @@ bool CollectiveManager::InitDeviceCommGroup(const CommunicationGroupPtr &group,
|
|||
bool init_group_success = false;
|
||||
bool init_group_fail = false;
|
||||
std::condition_variable thread_blocker;
|
||||
init_group_thread_ = std::make_unique<std::thread>([&] {
|
||||
init_group_thread_ = std::make_unique<std::thread>([&, this] {
|
||||
device_ctx_->Initialize();
|
||||
if (!group->Initialize(root_info)) {
|
||||
MS_LOG(ERROR) << "Initialize group on the device side failed.";
|
||||
|
|
|
@ -48,7 +48,7 @@ void LocalFile::Write(const std::vector<InputData> &inputs, const DirtyInfo &dir
|
|||
TransformDirtyInfoToBlockIndices(dirty_info, &block_indices);
|
||||
|
||||
for (const auto &block_index : block_indices) {
|
||||
WriteOneBlockFile(block_index, inputs);
|
||||
WriteOneBlockFile(IntToSize(block_index), inputs);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -55,9 +55,9 @@ class LocalFile : public StorageBase {
|
|||
// 1. Create blocks and block metas.
|
||||
// 2. Write input data to block files and Generate sha256 sequence for every block file.
|
||||
// Write the entire blob data of tensor to the block files on disk:
|
||||
void Write(const InputData &input, const DirtyInfo &dirty_info = {}) override;
|
||||
void Write(const InputData &input, const DirtyInfo &dirty_info) override;
|
||||
// Write the entire blob data composed of multiple tensors to the block files on disk:
|
||||
void Write(const std::vector<InputData> &inputs, const DirtyInfo &dirty_info = {}) override;
|
||||
void Write(const std::vector<InputData> &inputs, const DirtyInfo &dirty_info) override;
|
||||
|
||||
// The following two methods are override version function for Read:
|
||||
// 1.Tamper proof check.
|
||||
|
|
|
@ -43,15 +43,15 @@ class StorageBase {
|
|||
virtual ~StorageBase() = default;
|
||||
|
||||
// Write input tensor to storage medium or memory buffer.
|
||||
// The parameter dirty_info is optional, indicating that the part of the Tensor that needs to be rewritten to storage,
|
||||
// The parameter dirty_info indicates that the part of the Tensor that needs to be rewritten to storage,
|
||||
// for example, some rows of embedding table need to be rewritten to storage, the dirty_info should contain these row
|
||||
// numbers.
|
||||
virtual void Write(const InputData &input, const DirtyInfo &dirty_info = {}) {}
|
||||
virtual void Write(const InputData &input, const DirtyInfo &dirty_info) {}
|
||||
|
||||
// Write input to storage medium or memory buffer, only support the input composed of multiple tensors with same shape
|
||||
// and data type and using same dirty info at present.
|
||||
// The parameter dirty_info is optional, indicating that the part of the Tensor that needs to be rewritten to storage.
|
||||
virtual void Write(const std::vector<InputData> &input, const DirtyInfo &dirty_info = {}) {}
|
||||
// The parameter dirty_info indicates that the part of the Tensor that needs to be rewritten to storage.
|
||||
virtual void Write(const std::vector<InputData> &input, const DirtyInfo &dirty_info) {}
|
||||
|
||||
// Read data from the storage medium or memory buffer and merge them into contiguous memory.
|
||||
virtual void Read(const OutputData &output) {}
|
||||
|
|
|
@ -24,13 +24,16 @@ namespace device {
|
|||
std::mutex StreamSynchronizer::instance_lock_;
|
||||
std::shared_ptr<StreamSynchronizer> StreamSynchronizer::instance_ = nullptr;
|
||||
|
||||
StreamSynchronizer::~StreamSynchronizer() {
|
||||
StreamSynchronizer::~StreamSynchronizer() noexcept {
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(task_mutex_);
|
||||
stop_ = true;
|
||||
}
|
||||
do_sync_stream_cv_.notify_all();
|
||||
worker_thread_.join();
|
||||
if (worker_thread_.joinable()) {
|
||||
worker_thread_.join();
|
||||
}
|
||||
device_context_ = nullptr;
|
||||
}
|
||||
|
||||
bool StreamSynchronizer::SyncStream(const std::string &device_name, uint32_t timeout) {
|
||||
|
@ -70,7 +73,10 @@ bool StreamSynchronizer::SyncStream(const std::string &device_name, uint32_t tim
|
|||
} else {
|
||||
sync_stream_time_out_ = true;
|
||||
runtime::recovery::RecoveryContext::GetInstance()->set_need_reinit_collective(true);
|
||||
distributed::collective::CollectiveManager::instance()->Finalize();
|
||||
if (!distributed::collective::CollectiveManager::instance()->Finalize()) {
|
||||
MS_LOG(ERROR) << "Finalize collective manager failed.";
|
||||
return false;
|
||||
}
|
||||
time_out_cv_.wait(lock, [this]() { return device_context_ == nullptr; });
|
||||
MS_LOG(WARNING) << "Synchronize stream time out.";
|
||||
return true;
|
||||
|
|
|
@ -42,7 +42,7 @@ class StreamSynchronizer {
|
|||
return instance_;
|
||||
}
|
||||
|
||||
~StreamSynchronizer();
|
||||
~StreamSynchronizer() noexcept;
|
||||
|
||||
// Execute synchronization stream with timeout mechanism.
|
||||
bool SyncStream(const std::string &device_name, uint32_t timeout = kTimeoutInSeconds);
|
||||
|
|
Loading…
Reference in New Issue