add file storage module

This commit is contained in:
lizhenyu 2021-11-02 20:52:39 +08:00
parent 5233c73805
commit a736171895
11 changed files with 988 additions and 0 deletions

View File

@ -0,0 +1,113 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MIINDSPORE_CCSRC_DISTRIBUTED_PERSISTENT_DATA_H_
#define MIINDSPORE_CCSRC_DISTRIBUTED_PERSISTENT_DATA_H_
#include <map>
#include <memory>
#include <vector>
#include <string>
#include <thread>
#include <utility>
#include "distributed/persistent/storage/local_file.h"
#include "utils/log_adapter.h"
namespace mindspore {
namespace distributed {
namespace persistent {
// The data class is used to save and manage the tensor in memory, and provides
// interfaces for persistence and disaster recovery.
template <typename T>
class Data {
public:
explicit Data(const std::shared_ptr<std::vector<T>> &data, const std::shared_ptr<std::vector<int>> &shape = nullptr)
: data_(data), shape_(shape) {}
virtual ~Data() = default;
// Get the memory data of Data
T *data() const { return data_->data(); }
// Get the mutable memory data of Data
std::shared_ptr<std::vector<T>> MutableData() const { return data_; }
// Get the element number of Data
size_t size() const { return data_->size(); }
// Get the dimension information of Data.
std::shared_ptr<std::vector<int>> shape() const { return shape_; }
protected:
// Container used to store continuous memory buffer of Data.
std::shared_ptr<std::vector<T>> data_;
// Container used to record the dimension information of Data which persists a tensor.
std::shared_ptr<std::vector<int>> shape_;
};
// Implementation of the class Data to complete the function of persistence and disaster tolerance.
template <typename T>
class PersistentData : public Data<T> {
public:
explicit PersistentData(const std::shared_ptr<std::vector<T>> &data,
const std::shared_ptr<std::vector<int>> &shape = nullptr)
: Data<T>(data, shape) {}
~PersistentData() override = default;
// Initialize storage module.
void Initialize(const std::map<std::string, std::string> &storage_config);
// In disaster recovery mode, memory of tensor need to be saved into disk file periodically.
void Persist(const storage::DirtyInfo &dirty_info) const;
// In disaster recovery mode, server node or worker node need to restore persistent data when restart.
void Restore() const;
private:
// The following variables are used in disaster recovery mode:
// The threads used to execute persistence task.
std::thread persist_thread_;
// The file storage handle used to persist data.
std::shared_ptr<storage::StorageBase> storage_;
};
template <typename T>
void PersistentData<T>::Initialize(const std::map<std::string, std::string> &storage_config) {
storage_ = std::make_shared<storage::LocalFile>(storage_config);
}
template <typename T>
void PersistentData<T>::Persist(const storage::DirtyInfo &dirty_info) const {
MS_EXCEPTION_IF_NULL(storage_);
storage::InputData input = std::make_tuple(*shape_, data(), size() * sizeof(T));
storage_->Write(input, dirty_info);
}
template <typename T>
void PersistentData<T>::Restore() const {
storage::OutputData output = std::make_pair(data(), size() * sizeof(T));
MS_EXCEPTION_IF_NULL(storage_);
storage_->Read(output);
}
} // namespace persistent
} // namespace distributed
} // namespace mindspore
#endif // MIINDSPORE_CCSRC_DISTRIBUTED_PERSISTENT_DATA_H_

View File

@ -0,0 +1,41 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "distributed/persistent/storage/block.h"
#include "utils/system/sha256.h"
#include "utils/log_adapter.h"
#include "utils/utils.h"
namespace mindspore {
namespace distributed {
namespace storage {
void Block::GenSha256Seq() const {
std::string sha256_cal = system::sha256::GetHashFromFile(block_file_name_);
MS_EXCEPTION_IF_NULL(block_meta_);
block_meta_->Insert(kHashSeq, sha256_cal);
}
bool Block::CheckSha256Seq() const {
MS_EXCEPTION_IF_NULL(block_meta_);
std::string sha256_gen = block_meta_->Get<std::string>(kHashSeq);
if (sha256_gen != system::sha256::GetHashFromFile(block_file_name_)) {
MS_LOG(ERROR) << "The block file has been modified, file name: " << block_file_name_;
}
return true;
}
} // namespace storage
} // namespace distributed
} // namespace mindspore

View File

@ -0,0 +1,64 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_DISTRIBUTED_PERSISTENT_STORAGE_BLOCK_H_
#define MINDSPORE_CCSRC_DISTRIBUTED_PERSISTENT_STORAGE_BLOCK_H_
#include <memory>
#include <string>
#include "distributed/persistent/storage/json_utils.h"
#include "nlohmann/json.hpp"
#include "distributed/persistent/storage/constants.h"
namespace mindspore {
namespace distributed {
namespace storage {
// Using json to store and get meta info of a block, the content of meta info can be customized,
// such as shard shape, shard range, field length, etc.
using BlockMeta = JsonUtils;
// Class Block corresponds to the block file, saves the path of the block file,
// and provides block file integrity verification.
class Block {
public:
explicit Block(const std::string &block_name) : block_file_name_(block_name) {}
~Block() = default;
// The following two methods are used to file integrity check.
// Generate sha256 hash sequence.
void GenSha256Seq() const;
// Check sha256 hash sequence.
bool CheckSha256Seq() const;
// Set the block meta pointer associated with the block file.
void set_block_meta(const std::shared_ptr<BlockMeta> &block_meta) { block_meta_ = block_meta; }
// Get block meta file path.
const std::string &block_file_name() const { return block_file_name_; }
private:
// The block meta information corresponding to the block.
std::shared_ptr<BlockMeta> block_meta_;
// The block file path.
std::string block_file_name_;
};
} // namespace storage
} // namespace distributed
} // namespace mindspore
#endif // MINDSPORE_CCSRC_DISTRIBUTED_PERSISTENT_STORAGE_BLOCK_H_

View File

@ -0,0 +1,43 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_DISTRIBUTED_PERSISTENT_STORAGE_CONSTANTS_H_
#define MINDSPORE_CCSRC_DISTRIBUTED_PERSISTENT_STORAGE_CONSTANTS_H_
namespace mindspore {
namespace distributed {
namespace storage {
// Block and BlockMeta related.
constexpr char kFieldsLength[] = "field_length";
constexpr char kOffset[] = "offset";
constexpr char kShardShape[] = "shard_shape";
constexpr char kShardRangeLowerBound[] = "shard_range_lower_bound";
constexpr char kShardRangeUpperBound[] = "shard_range_upper_bound";
constexpr char kHashSeq[] = "hash_seq";
constexpr char kBlockFilePrefix[] = "block_";
constexpr char kBlockMetaFilePrefix[] = "block_meta_";
constexpr char kJsonSuffix[] = ".json";
constexpr size_t JSON_SUFFIX_LENS = 5;
// Storage config related.
constexpr char kFileStoragePath[] = "file_storage_path";
constexpr char kMaxBlockLength[] = "max_block_length";
} // namespace storage
} // namespace distributed
} // namespace mindspore
#endif // MINDSPORE_CCSRC_DISTRIBUTED_PERSISTENT_STORAGE_CONSTANTS_H_

View File

@ -0,0 +1,125 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "distributed/persistent/storage/file_io_utils.h"
#include <fstream>
#include "utils/log_adapter.h"
namespace mindspore {
namespace distributed {
namespace storage {
namespace {
bool CheckFStreamLength(const std::string &file_name, std::fstream &fs, size_t size) {
size_t cur_pos = fs.tellp();
fs.seekp(0, std::ios::end);
if (!fs.good() || fs.fail() || fs.bad()) {
MS_LOG(ERROR) << "Failed to seedp file pos, file name: " << file_name;
return false;
}
size_t end_pos = fs.tellp();
if (end_pos - cur_pos < size) {
MS_LOG(ERROR) << "The content length of file:" << file_name << " is less than expected size: " << size;
return false;
}
fs.seekp(cur_pos);
if (!fs.good() || fs.fail() || fs.bad()) {
MS_LOG(ERROR) << "Failed to seedp file pos, file name: " << file_name;
return false;
}
return true;
}
} // namespace
bool FileIOUtils::Write(const std::string &file_name, const std::vector<std::pair<const void *, size_t>> &inputs) {
if (file_name.empty()) {
MS_LOG(ERROR) << "The file name is empty";
return false;
}
std::fstream fs;
fs.open(file_name, std::ios::out | std::ios::binary);
if (!fs.is_open() || !fs.good()) {
MS_LOG(ERROR) << "Open file failed, file name: " << file_name;
return false;
}
for (const auto &item : inputs) {
const void *data = item.first;
MS_ERROR_IF_NULL(data);
size_t size = item.second;
fs.write(reinterpret_cast<const char *>(data), size);
if (!fs.good() || fs.fail() || fs.bad()) {
fs.close();
MS_LOG(ERROR) << "Insert data to fstream failed.";
return false;
}
fs.flush();
if (!fs.good() || fs.fail() || fs.bad()) {
fs.close();
MS_LOG(ERROR) << "Insert data to fstream failed.";
return false;
}
}
fs.close();
return true;
}
bool FileIOUtils::Read(const std::string &file_name, const std::vector<std::pair<void *, size_t>> &outputs) {
if (file_name.empty()) {
MS_LOG(ERROR) << "The file name is empty";
return false;
}
std::fstream fs;
fs.open(file_name, std::ios::in | std::ios::binary);
if (!fs.is_open() || !fs.good()) {
MS_LOG(ERROR) << "Open file failed, file name: " << file_name;
return false;
}
for (const auto &item : outputs) {
void *data = item.first;
MS_ERROR_IF_NULL(data);
size_t size = item.second;
if (!CheckFStreamLength(file_name, fs, size)) {
return false;
}
fs.read(reinterpret_cast<char *>(data), size);
if (!fs.good() || fs.fail() || fs.bad()) {
fs.close();
MS_LOG(ERROR) << "Read data from fstream failed.";
return false;
}
}
fs.close();
return true;
}
bool FileIOUtils::IsFileExist(const std::string &file) {
std::ifstream fs(file.c_str());
bool file_exist = fs.good();
fs.close();
return file_exist;
}
} // namespace storage
} // namespace distributed
} // namespace mindspore

View File

@ -0,0 +1,42 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_DISTRIBUTED_PERSISTENT_STORAGE_FILE_IO_UTILS_H_
#define MINDSPORE_CCSRC_DISTRIBUTED_PERSISTENT_STORAGE_FILE_IO_UTILS_H_
#include <vector>
#include <string>
#include <utility>
namespace mindspore {
namespace distributed {
namespace storage {
class FileIOUtils {
public:
// Write memory buffer to the file on overwriting mode, create a new file if the file is not exist.
static bool Write(const std::string &file_name, const std::vector<std::pair<const void *, size_t>> &inputs);
// Read file and load the context into memory buffer, return false if the file is not exist.
static bool Read(const std::string &file_name, const std::vector<std::pair<void *, size_t>> &outputs);
// Judeg whether a file exists.
static bool IsFileExist(const std::string &file);
};
} // namespace storage
} // namespace distributed
} // namespace mindspore
#endif // MINDSPORE_CCSRC_DISTRIBUTED_PERSISTENT_STORAGE_FILE_IO_UTILS_H_

View File

@ -0,0 +1,46 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "distributed/persistent/storage/json_utils.h"
#include "distributed/persistent/storage/file_io_utils.h"
#include "utils/utils.h"
namespace mindspore {
namespace distributed {
namespace storage {
bool JsonUtils::Initialize() {
if (!FileIOUtils::IsFileExist(file_name_)) {
std::ofstream output_file(file_name_);
output_file.close();
ChangeFileMode(file_name_, S_IRUSR | S_IWUSR);
return true;
}
std::ifstream json_file(file_name_);
try {
json_file >> js_;
json_file.close();
} catch (nlohmann::json::exception &e) {
json_file.close();
std::string illegal_exception = e.what();
MS_LOG(ERROR) << "Parse json file:" << file_name_ << " failed, the exception:" << illegal_exception;
return false;
}
return true;
}
} // namespace storage
} // namespace distributed
} // namespace mindspore

View File

@ -0,0 +1,75 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_DISTRIBUTED_PERSISTENT_STORAGE_JSON_UTILS_H_
#define MINDSPORE_CCSRC_DISTRIBUTED_PERSISTENT_STORAGE_JSON_UTILS_H_
#include <fstream>
#include <string>
#include "utils/json_operation_utils.h"
#include "nlohmann/json.hpp"
namespace mindspore {
namespace distributed {
namespace storage {
// This class uses json format to store and obtain a large number of key-value pairs, supports creating or opening json
// files, reading or modifying key-value in json.
class JsonUtils {
public:
explicit JsonUtils(const std::string &file_name) : file_name_(file_name) {}
~JsonUtils() = default;
// Load or create a json file.
bool Initialize();
// Get the value corresponding to the key in json.
template <typename T>
T Get(const std::string &key) const;
// Insert a key-value pair into json or change the value corresponding to the key in json.
template <typename T>
void Insert(const std::string &key, const T &value);
private:
// Json object.
nlohmann::json js_;
// The json file path.
std::string file_name_;
};
template <typename T>
T JsonUtils::Get(const std::string &key) const {
if (!js_.contains(key)) {
MS_LOG(EXCEPTION) << "The key:" << key << " is not exist.";
}
return GetJsonValue<T>(js_, key);
}
template <typename T>
void JsonUtils::Insert(const std::string &key, const T &value) {
std::ofstream output_file(file_name_);
js_[key] = value;
output_file << js_.dump();
output_file.close();
}
} // namespace storage
} // namespace distributed
} // namespace mindspore
#endif // MINDSPORE_CCSRC_DISTRIBUTED_PERSISTENT_STORAGE_JSON_UTILS_H_

View File

@ -0,0 +1,262 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "distributed/persistent/storage/local_file.h"
#include <dirent.h>
#include <cmath>
#include <algorithm>
#include <numeric>
#include <tuple>
#include <utility>
#include "utils/convert_utils_base.h"
#include "utils/log_adapter.h"
#include "utils/utils.h"
#include "distributed/persistent/storage/constants.h"
namespace mindspore {
namespace distributed {
namespace storage {
void LocalFile::Write(const InputData &input, const DirtyInfo &dirty_info) {
std::vector<InputData> inputs = {input};
Write(inputs, dirty_info);
}
void LocalFile::Write(const std::vector<InputData> &inputs, const DirtyInfo &dirty_info) {
if (inputs.empty()) {
MS_LOG(EXCEPTION) << "The inputs is empty";
}
// The block file has been created, only the blocks related to the dirty information need to be rewritten.
if (finish_create_block_files_) {
std::vector<int> block_indices;
TransformDirtyInfoToBlockIndices(dirty_info, &block_indices);
for (const auto &block_index : block_indices) {
WriteOneBlockFile(block_index, inputs);
}
return;
}
// Create block files and write inputs_data to block files.
WriteBlockFiles(inputs);
}
void LocalFile::TransformDirtyInfoToBlockIndices(const DirtyInfo &dirty_info, std::vector<int> *block_indices) const {
if (block_meta_list_.empty()) {
MS_LOG(EXCEPTION) << "The block meta list is empty";
}
size_t block_index = 0;
bool block_index_alread_insert_vec = false;
auto block_meta_ptr = block_meta_list_.at(block_index);
MS_EXCEPTION_IF_NULL(block_meta_ptr);
int cur_lower_bound = block_meta_ptr->Get<int>(kShardRangeLowerBound);
int cur_upper_bound = block_meta_ptr->Get<int>(kShardRangeUpperBound);
for (const auto &dirty_value : dirty_info) {
if (dirty_value >= cur_lower_bound && dirty_value < cur_upper_bound) {
if (!block_index_alread_insert_vec) {
block_index_alread_insert_vec = true;
block_indices->push_back(block_index);
}
continue;
}
while (!(dirty_value >= cur_lower_bound && dirty_value < cur_upper_bound)) {
if (++block_index >= block_meta_list_.size()) {
break;
}
block_meta_ptr = block_meta_list_[block_index];
MS_EXCEPTION_IF_NULL(block_meta_ptr);
cur_lower_bound = block_meta_ptr->Get<int>(kShardRangeLowerBound);
cur_upper_bound = block_meta_ptr->Get<int>(kShardRangeUpperBound);
}
block_indices->push_back(block_index);
}
}
void LocalFile::WriteBlockFiles(const std::vector<InputData> &inputs) {
if (inputs.empty()) {
MS_LOG(EXCEPTION) << "The inputs is empty";
}
const std::vector<int> &shape = std::get<0>(inputs.front());
size_t first_dim = 0;
if (shape.size() > 0) {
first_dim = IntToSize(shape[0]);
}
if (first_dim == 0) {
MS_LOG(EXCEPTION) << "The dimension of input shape contain zero.";
}
size_t non_first_dims_size = std::get<2>(inputs.front()) / first_dim;
if (non_first_dims_size == 0) {
MS_LOG(EXCEPTION) << "The size of input tensor is zero.";
}
size_t tensor_num = inputs.size();
size_t slice_size = static_cast<size_t>(
std::floor(static_cast<float>(static_cast<float>(max_block_length_) / tensor_num) / non_first_dims_size));
if (slice_size == 0) {
MS_LOG(EXCEPTION) << "The slice size in block is zero.";
}
size_t block_num = static_cast<size_t>(std::ceil(static_cast<float>(first_dim) / slice_size));
size_t offset = 0;
for (size_t block_index = 0; block_index < block_num; ++block_index) {
// Create block meta.
auto block_meta_ptr =
std::make_shared<BlockMeta>(file_path_ + "/" + kBlockMetaFilePrefix + std::to_string(block_index) + kJsonSuffix);
block_meta_ptr->Initialize();
size_t cur_lower_bound = slice_size * block_index;
block_meta_ptr->Insert(kShardRangeLowerBound, cur_lower_bound);
size_t cur_upper_bound = std::min(cur_lower_bound + slice_size, first_dim);
block_meta_ptr->Insert(kShardRangeUpperBound, cur_upper_bound);
size_t field_length = (cur_upper_bound - cur_lower_bound) * non_first_dims_size;
block_meta_ptr->Insert(kFieldsLength, field_length);
block_meta_ptr->Insert(kOffset, offset);
offset += field_length;
block_meta_list_.push_back(block_meta_ptr);
// Create block.
auto block_ptr = std::make_shared<Block>(file_path_ + "/" + kBlockFilePrefix + std::to_string(block_index));
block_ptr->set_block_meta(block_meta_ptr);
block_list_.push_back(block_ptr);
}
finish_create_block_files_ = true;
// Write inputs_data to block files and Gen Sha256 seq.
for (size_t block_index = 0; block_index < block_num; ++block_index) {
WriteOneBlockFile(block_index, inputs);
}
}
void LocalFile::WriteOneBlockFile(size_t block_index, const std::vector<InputData> &inputs) const {
const auto &block_meta_ptr = block_meta_list_.at(block_index);
MS_EXCEPTION_IF_NULL(block_meta_ptr);
size_t field_size = block_meta_ptr->Get<size_t>(kFieldsLength);
size_t offset = block_meta_ptr->Get<size_t>(kOffset);
std::vector<std::pair<const void *, size_t>> block_inputs_data;
for (size_t input_index = 0; input_index < inputs.size(); ++input_index) {
const void *data_ptr = reinterpret_cast<const char *>(std::get<1>(inputs.at(input_index))) + offset;
size_t data_size = field_size;
block_inputs_data.emplace_back(data_ptr, data_size);
}
const auto &block_ptr = block_list_.at(block_index);
MS_EXCEPTION_IF_NULL(block_ptr);
// Rewrite the current block file.
if (FileIOUtils::Write(block_ptr->block_file_name(), block_inputs_data)) {
MS_LOG(EXCEPTION) << "Write to block file[" << block_ptr->block_file_name() << "] failed.";
}
ChangeFileMode(block_ptr->block_file_name(), S_IRUSR | S_IWUSR);
// Generate sha256 hash sequence.
block_ptr->GenSha256Seq();
}
void LocalFile::Read(const OutputData &output) {
std::vector<OutputData> outputs = {output};
Read(outputs);
}
void LocalFile::Read(const std::vector<OutputData> &outputs) {
if (block_list_.empty() || block_meta_list_.empty()) {
// Load file list info of block files and block meta files in the current folder to block list and block meta list.
if (!LoadBlocksInfo()) {
MS_LOG(EXCEPTION) << "LoadBlocksInfo failed";
}
}
// Read all block files.
for (size_t block_index = 0; block_index < block_list_.size(); ++block_index) {
std::vector<std::pair<void *, size_t>> block_output_data;
const auto &block_meta_ptr = block_meta_list_[block_index];
MS_EXCEPTION_IF_NULL(block_meta_ptr);
size_t field_size = block_meta_ptr->Get<size_t>(kFieldsLength);
size_t offset = block_meta_ptr->Get<size_t>(kOffset);
for (size_t output_index = 0; output_index < outputs.size(); ++output_index) {
void *data_ptr = reinterpret_cast<char *>(std::get<1>(outputs[output_index])) + offset;
size_t data_size = field_size;
block_output_data.emplace_back(data_ptr, data_size);
}
const auto &block_ptr = block_list_[block_index];
MS_EXCEPTION_IF_NULL(block_ptr);
if (block_ptr->CheckSha256Seq()) {
MS_LOG(EXCEPTION) << "CheckSha256 failed, file name [" << block_ptr->block_file_name() << "]";
}
FileIOUtils::Read(block_ptr->block_file_name(), block_output_data);
}
}
bool LocalFile::LoadBlocksInfo() {
DIR *dir = opendir(file_path_.c_str());
if (dir == nullptr) {
MS_LOG(ERROR) << "The file path [" << file_path_ << "] is not exist";
return false;
}
std::vector<std::string> block_file_name_list;
std::vector<std::string> block_meta_file_name_list;
struct dirent *entry;
// Get file names of all block file and block meta file in the current folder.
while ((entry = readdir(dir)) != nullptr) {
std::string file_name = entry->d_name;
if (file_name.length() <= JSON_SUFFIX_LENS) {
continue;
}
auto suffix = file_name.substr(file_name.length() - JSON_SUFFIX_LENS);
if (suffix == kJsonSuffix) {
block_meta_file_name_list.push_back(file_name);
} else {
block_file_name_list.push_back(file_name);
}
}
(void)closedir(dir);
if (block_file_name_list.size() != block_meta_file_name_list.size()) {
MS_LOG(ERROR) << "The block file number[" << block_file_name_list.size()
<< "] is not equal to block meta file number[" << block_meta_file_name_list.size() << "]";
return false;
}
sort(block_file_name_list.begin(), block_file_name_list.end());
for (size_t i = 0; i < block_file_name_list.size(); i++) {
auto block_meta_ptr = std::make_shared<BlockMeta>(block_meta_file_name_list[i]);
block_meta_ptr->Initialize();
block_meta_list_.push_back(block_meta_ptr);
auto block_ptr = std::make_shared<Block>(block_file_name_list[i]);
block_ptr->set_block_meta(block_meta_ptr);
block_list_.push_back(block_ptr);
}
return true;
}
} // namespace storage
} // namespace distributed
} // namespace mindspore

View File

@ -0,0 +1,104 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_DISTRIBUTED_PERSISTENT_STORAGE_LOCAL_FILE_H_
#define MINDSPORE_CCSRC_DISTRIBUTED_PERSISTENT_STORAGE_LOCAL_FILE_H_
#include <map>
#include <memory>
#include <string>
#include <vector>
#include "distributed/persistent/storage/storage.h"
#include "distributed/persistent/storage/block.h"
#include "distributed/persistent/storage/file_io_utils.h"
namespace mindspore {
namespace distributed {
namespace storage {
// The default maximum block length : 128MB.
constexpr size_t DEFAULT_MAX_BLOCK_LENGTH = 128 << 20;
// File type persistence storage implementation class.
class LocalFile : public StorageBase {
public:
explicit LocalFile(const std::map<std::string, std::string> &storage_config) {
auto file_path_iter = storage_config.find(kFileStoragePath);
if (file_path_iter != storage_config.end()) {
file_path_ = file_path_iter->second;
}
auto block_length_iter = storage_config.find(kMaxBlockLength);
if (block_length_iter != storage_config.end() && !(block_length_iter->second).empty()) {
max_block_length_ = std::stoul(block_length_iter->second);
} else {
max_block_length_ = DEFAULT_MAX_BLOCK_LENGTH;
}
}
~LocalFile() override = default;
// The following two methods are override version function for Write:
// 1. Create blocks and block metas.
// 2. Write input data to block files and Generate sha256 sequence for every block file.
// Write the entire blob data of tensor to the block files on disk:
void Write(const InputData &input, const DirtyInfo &dirty_info = {}) override;
// Write the entire blob data composed of multiple tensors to the block files on disk:
void Write(const std::vector<InputData> &inputs, const DirtyInfo &dirty_info = {}) override;
// The following two methods are override version function for Read:
// 1.Tamper proof check.
// 2.Read all block files and merge them into contiguous memory.
// Read data from all block files in file_path_(dir):
void Read(const OutputData &output) override;
// Read data from all block files in file_path_(dir) for multiple tensors.
void Read(const std::vector<OutputData> &outputs) override;
private:
// Create blocks and block metas and write input data to block files.
void WriteBlockFiles(const std::vector<InputData> &inputs);
// Write shardding data to one specific block file by block index and generate sha256.
void WriteOneBlockFile(size_t block_index, const std::vector<InputData> &inputs) const;
// Obtain the corresponding file block index according to dirty info, only need to rewrite these file blocks, and
// dirty info needs to be sorted in ascending order.
void TransformDirtyInfoToBlockIndices(const DirtyInfo &dirty_info, std::vector<int> *block_indices) const;
// Load file list info of block files and block meta files in the 'file_path_' to block list and block meta list.
bool LoadBlocksInfo();
// The local file is composed of many block files, and each block file corresponds to a Block object in memory.
std::vector<std::shared_ptr<Block>> block_list_;
// Container used to store meta info for every block in member variable 'block_list_', meta info can be customized,
// such as shard shape, shard range, field length, etc.
std::vector<std::shared_ptr<BlockMeta>> block_meta_list_;
// Folder path to save all block files.
std::string file_path_;
// Maximum size of each block file.
size_t max_block_length_;
// Indicates whether block files has been created.
bool finish_create_block_files_{false};
};
} // namespace storage
} // namespace distributed
} // namespace mindspore
#endif // MINDSPORE_CCSRC_DISTRIBUTED_PERSISTENT_STORAGE_LOCAL_FILE_H_

View File

@ -0,0 +1,73 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CCSRC_DISTRIBUTED_PERSISTENT_STORAGE_STORAGE_H_
#define MINDSPORE_CCSRC_DISTRIBUTED_PERSISTENT_STORAGE_STORAGE_H_
#include <map>
#include <string>
#include <vector>
#include <tuple>
#include <utility>
#include "distributed/persistent/storage/block.h"
#include "distributed/persistent/storage/constants.h"
namespace mindspore {
namespace distributed {
namespace storage {
// InputData consists of shape, const buffer pointer and size
using InputData = std::tuple<std::vector<int>, const void *, size_t>;
// OutputData consists of buffer pointer and size
using OutputData = std::pair<void *, size_t>;
// DirtyInfo is used to indicate the part of the Tensor that needs to be rewritten to storage,
using DirtyInfo = std::vector<int>;
// Storage configuration, you can choose different configurations according to different storage forms, and support
// modification, such as using file storage to configure the file storage path.
std::map<std::string, std::string> &Config() {
static std::map<std::string, std::string> config = {{kFileStoragePath, ""}};
return config;
}
// This Class provides upper-layer interfaces for persistent storage.
class StorageBase {
public:
StorageBase() = default;
virtual ~StorageBase() = default;
// Write input tensor to storage medium or memory buffer.
// The parameter dirty_info is optional, indicating that the part of the Tensor that needs to be rewritten to storage,
// for example, some rows of embedding table need to be rewritten to storage, the dirty_info should contain these row
// numbers.
virtual void Write(const InputData &input, const DirtyInfo &dirty_info = {}) {}
// Write input to storage medium or memory buffer, only support the input composed of multiple tensors with same shape
// and data type and using same dirty info at present.
// The parameter dirty_info is optional, indicating that the part of the Tensor that needs to be rewritten to storage.
virtual void Write(const std::vector<InputData> &input, const DirtyInfo &dirty_info = {}) {}
// Read data from the storage medium or memory buffer and merge them into contiguous memory.
virtual void Read(const OutputData &output) {}
// Read data from the storage medium or memory buffer and merge them into contiguous memory for multiple tensors.
virtual void Read(const std::vector<OutputData> &outputs) {}
};
} // namespace storage
} // namespace distributed
} // namespace mindspore
#endif // MINDSPORE_CCSRC_DISTRIBUTED_PERSISTENT_STORAGE_STORAGE_H_