!32834 [bugfix] recovery directory creation conflict

Merge pull request !32834 from zyli2020/master
This commit is contained in:
i-robot 2022-04-12 01:52:53 +00:00 committed by Gitee
commit 39a5ec97d0
No known key found for this signature in database
GPG Key ID: 173E9B9CA92EEF8F
3 changed files with 59 additions and 35 deletions

View File

@ -174,15 +174,17 @@ void FileIOUtils::CreateDirRecursive(const std::string &dir_path, mode_t mode) {
#if defined(_WIN32) || defined(_WIN64)
int32_t ret = mkdir(tmp_dir_path);
if (ret != 0) {
MS_LOG(EXCEPTION) << "Failed to create directory recursion: " << dir_path << ". Errno = " << errno;
}
#else
int32_t ret = mkdir(tmp_dir_path, mode);
if (ret == 0) {
ChangeFileMode(tmp_dir_path, mode);
}
#endif
if (ret != 0) {
} else if (errno != EEXIST) {
MS_LOG(EXCEPTION) << "Failed to create directory recursion: " << dir_path << ". Errno = " << errno;
}
#endif
}
}
}

View File

@ -101,41 +101,24 @@ void RecoveryContext::Initialize() {
MS_LOG(EXCEPTION) << "Role name '" << node_role_ << "' is invalid. ";
}
// 2. Create config json file.
if (node_role_ == distributed::kEnvRoleOfScheduler) {
if (!FileIOUtils::IsFileOrDirExist(recovery_path_)) {
FileIOUtils::CreateDirRecursive(recovery_path_);
}
auto ret = FileUtils::GetRealPath(recovery_path_.c_str());
if (!ret.has_value()) {
MS_LOG(EXCEPTION) << "Cannot get real path of persistent storage path: " << recovery_path_;
}
recovery_path_ = ret.value();
if (!FileIOUtils::IsFileOrDirExist(recovery_path_ + kConfigJson)) {
nlohmann::json config_js;
config_js[std::string(ps::kStoreType)] = 1;
config_js[std::string(ps::kStoreFilePath)] = recovery_path_ + "/" + ps::kStoreFilePath + kJsonSuffix;
config_js[std::string(ps::kSchedulerStoreFilePath)] =
recovery_path_ + "/" + ps::kSchedulerStoreFilePath + kJsonSuffix;
nlohmann::json recovery_js;
recovery_js[std::string(ps::kKeyRecovery)] = config_js;
std::ofstream config_file(recovery_path_ + kConfigJson);
config_file << recovery_js.dump();
config_file.close();
}
// 2. Get real recovery path and create config file.
if (!FileIOUtils::IsFileOrDirExist(recovery_path_)) {
FileIOUtils::CreateDirRecursive(recovery_path_);
}
// 3. Worker or Server need to wait the recovery config json file to be created.
while (!FileIOUtils::IsFileOrDirExist(recovery_path_ + kConfigJson)) {
// Wait duration: 200ms.
const int kWaitDuration = 200;
std::this_thread::sleep_for(std::chrono::milliseconds(kWaitDuration));
auto ret = FileUtils::GetRealPath(recovery_path_.c_str());
if (!ret.has_value()) {
MS_LOG(EXCEPTION) << "Cannot get real path of persistent storage path: " << recovery_path_;
}
recovery_path_ = ret.value();
std::string config_file_path = recovery_path_ + kConfigJson;
if (!FileIOUtils::IsFileOrDirExist(config_file_path)) {
CreateConfigFile(config_file_path);
}
// 4. Set config content to PSContext.
ps::PSContext::instance()->set_config_file_path(recovery_path_ + kConfigJson);
// 3. Set config content to PSContext.
ps::PSContext::instance()->set_config_file_path(config_file_path);
ps::PSContext::instance()->set_node_id(common::GetEnv(distributed::cluster::topology::kEnvNodeId));
initialized_ = true;
@ -300,6 +283,42 @@ void RecoveryContext::ParseLatestCkptInfo(const int *recv_buffer, const uint32_t
latest_ckpt_file_ = GetCkptPath() + "/" + *iter;
}
void RecoveryContext::CreateConfigFile(const std::string &config_file_path) {
if (FileIOUtils::IsFileOrDirExist(config_file_path)) {
MS_LOG(WARNING) << "The config file exists, file path: " << config_file_path;
return;
}
int fd = open(config_file_path.c_str(), O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
if (fd == -1) {
if (errno != EEXIST) {
MS_LOG(EXCEPTION) << "Create config file: [" << config_file_path << "] failed, errno: " << errno << ", "
<< strerror(errno);
}
MS_LOG(INFO) << "The config file is already created, file path: " << config_file_path;
} else {
// Create config file.
nlohmann::json config_js;
config_js[std::string(ps::kStoreType)] = 1;
config_js[std::string(ps::kStoreFilePath)] = recovery_path_ + "/" + ps::kStoreFilePath + kJsonSuffix;
config_js[std::string(ps::kSchedulerStoreFilePath)] =
recovery_path_ + "/" + ps::kSchedulerStoreFilePath + kJsonSuffix;
nlohmann::json recovery_js;
recovery_js[std::string(ps::kKeyRecovery)] = config_js;
std::string config_content = recovery_js.dump();
auto ret_size = write(fd, config_content.c_str(), config_content.size());
if (ret_size != SizeToLong(config_content.size())) {
close(fd);
errno_t err = (ret_size == 0) ? EOF : errno;
MS_LOG(EXCEPTION) << "Write config file: [" << config_file_path << "] failed, errno: " << err << ", "
<< strerror(err);
}
close(fd);
}
}
void RecoveryContext::CreatePersistentFile() {
std::unique_lock<std::mutex> lock(create_persist_json_mtx_);
if (node_role_ == distributed::kEnvRoleOfScheduler) {

View File

@ -102,7 +102,10 @@ class BACKEND_EXPORT RecoveryContext {
// Initialize recovery context.
void Initialize();
// Create persitent json file, used to persist recovery config.
// Create config json file, used to persist node info of cluster.
void CreateConfigFile(const std::string &config_file_path);
// Create persitent json file, used to persist recovery config of Worker, such as ckpt path.
void CreatePersistentFile();
// Obtain the step corresponding to the local latest checkpoint in each training process.