diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_arena.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_arena.cc index e358ac3573d..76bc0b4ecd8 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_arena.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_arena.cc @@ -18,8 +18,7 @@ namespace mindspore { namespace dataset { CachedSharedMemoryArena::CachedSharedMemoryArena(int32_t port, size_t val_in_GB) - : Arena::Arena(val_in_GB * 1024), port_(port), shmid_(-1) {} - + : ptr_(nullptr), val_in_GB_(val_in_GB), port_(port), shmid_(-1) {} CachedSharedMemoryArena::~CachedSharedMemoryArena() { #if CACHE_LOCAL_CLIENT if (this->ptr_ != nullptr && this->ptr_ != reinterpret_cast(-1)) { @@ -54,18 +53,18 @@ Status CachedSharedMemoryArena::CreateArena(std::unique_ptrshmid_ = shmget(shm_key, ba->size_in_bytes_, IPC_CREAT | IPC_EXCL | access_mode); + // Value is in GB. Convert into bytes. + int64_t sz = val_in_GB * 1073741824L; + ba->shmid_ = shmget(shm_key, sz, IPC_CREAT | IPC_EXCL | access_mode); if (ba->shmid_) { ba->ptr_ = shmat(ba->shmid_, nullptr, 0); if (ba->ptr_ == reinterpret_cast(-1)) { RETURN_STATUS_UNEXPECTED("Shared memory attach failed. Errno " + std::to_string(errno)); } + ba->impl_ = std::make_unique(ba->ptr_, sz); } else { RETURN_STATUS_UNEXPECTED("Shared memory creation failed. Errno " + std::to_string(errno)); } - uint64_t num_blks = ba->size_in_bytes_ / ARENA_BLK_SZ; - MS_LOG(DEBUG) << "Size of memory pool is " << num_blks << ", number of blocks of size is " << ARENA_BLK_SZ << "."; - ba->tr_.Insert(0, num_blks); #endif return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_arena.h b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_arena.h index 3d0ba7cbbc4..d0f41588da0 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_arena.h +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_arena.h @@ -17,14 +17,18 @@ #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_CACHE_ARENA_H_ #include +#include #include #include "minddata/dataset/util/arena.h" #include "minddata/dataset/engine/cache/cache_common.h" namespace mindspore { namespace dataset { /// This is a derived class of Arena but resides in shared memory -class CachedSharedMemoryArena : public Arena { +class CachedSharedMemoryArena : public MemoryPool { public: + // Disable copy and assignment constructor + CachedSharedMemoryArena(const CachedSharedMemoryArena &) = delete; + CachedSharedMemoryArena &operator=(const CachedSharedMemoryArena &) = delete; ~CachedSharedMemoryArena() override; /// \brief Create an Arena in shared memory /// \param[out] p_ba Pointer to a unique_ptr @@ -39,11 +43,41 @@ class CachedSharedMemoryArena : public Arena { /// in the client. So instead we will return an address relative /// to the base address of the shared memory where we attach to. /// \return Base address of the shared memory. - const void *SharedMemoryBaseAddr() const { return this->ptr_; } + const void *SharedMemoryBaseAddr() const { return impl_->get_base_addr(); } + + /// As a derived class of MemoryPool, we have to implement the following + /// But we simply transfer the call to the implementation class + Status Allocate(size_t size, void **pVoid) override { + std::unique_lock lock(mux_); + return impl_->Allocate(size, pVoid); + } + Status Reallocate(void **pVoid, size_t old_sz, size_t new_sz) override { + std::unique_lock lock(mux_); + return impl_->Reallocate(pVoid, old_sz, new_sz); + } + void Deallocate(void *pVoid) override { + std::unique_lock lock(mux_); + impl_->Deallocate(pVoid); + } + uint64_t get_max_size() const override { return impl_->get_max_size(); } + int PercentFree() const override { + std::unique_lock lock(mux_); + return impl_->PercentFree(); + } + + /// \brief Dump the memory allocation block. + friend std::ostream &operator<<(std::ostream &os, const CachedSharedMemoryArena &s) { + os << *(s.impl_); + return os; + } private: + mutable std::mutex mux_; + void *ptr_; + int32_t val_in_GB_; int32_t port_; int shmid_; + std::unique_ptr impl_; /// Private constructor. Not to be called directly. CachedSharedMemoryArena(int32_t port, size_t val_in_GB); }; diff --git a/mindspore/ccsrc/minddata/dataset/util/allocator.h b/mindspore/ccsrc/minddata/dataset/util/allocator.h index 82bb157052c..744844aba8a 100644 --- a/mindspore/ccsrc/minddata/dataset/util/allocator.h +++ b/mindspore/ccsrc/minddata/dataset/util/allocator.h @@ -40,6 +40,7 @@ class Allocator { using reference = T &; using const_reference = const T &; using size_type = uint64_t; + using difference_type = std::ptrdiff_t; template struct rebind { @@ -86,8 +87,30 @@ class Allocator { private: std::shared_ptr pool_; }; -/// \brief It is a wrapper of unique_ptr with a custom allocator and acts like std::lock_guard such that the memory will -/// be released when the object goes out of scope +/// \brief It is a wrapper of unique_ptr with a custom Allocator class defined above +template +Status MakeUnique(std::unique_ptr> *out, Allocator alloc, size_t n, Args &&... args) { + RETURN_UNEXPECTED_IF_NULL(out); + CHECK_FAIL_RETURN_UNEXPECTED(n > 0, "size must be positive"); + T *data = alloc.allocate(n); + if (!std::is_arithmetic::value) { + for (auto i = 0; i < n; i++) { + std::allocator_traits>::construct(alloc, &(data[i]), std::forward(args)...); + } + } + auto deleter = [](T *p, Allocator f_alloc, size_t f_n) { + if (!std::is_arithmetic::value && std::is_destructible::value) { + for (auto i = 0; i < f_n; ++i) { + std::allocator_traits>::destroy(f_alloc, &p[i]); + } + } + f_alloc.deallocate(p, f_n); + }; + *out = std::unique_ptr>(data, std::bind(deleter, std::placeholders::_1, alloc, n)); + return Status::OK(); +} + +/// \brief It is a wrapper of the above custom unique_ptr with some additional methods /// \tparam T The type of object to be allocated /// \tparam C Allocator. Default to std::allocator template > @@ -113,14 +136,7 @@ class MemGuard { /// \brief Explicitly deallocate the memory if allocated void deallocate() { if (ptr_) { - auto *p = ptr_.release(); - if (!std::is_arithmetic::value && std::is_destructible::value) { - for (auto i = 0; i < n_; ++i) { - p[i].~T(); - } - } - alloc_.deallocate(p, n_); - n_ = 0; + ptr_.reset(); } } /// \brief Allocate memory (with emplace feature). Previous one will be released. If size is 0, no new memory is @@ -129,24 +145,9 @@ class MemGuard { /// \tparam Args Extra arguments pass to the constructor of T template Status allocate(size_t n, Args &&... args) noexcept { - try { - deallocate(); - if (n > 0) { - T *data = alloc_.allocate(n); - if (!std::is_arithmetic::value) { - for (auto i = 0; i < n; i++) { - std::allocator_traits::construct(alloc_, &(data[i]), std::forward(args)...); - } - } - ptr_ = std::unique_ptr(data); - n_ = n; - } - } catch (const std::bad_alloc &e) { - return Status(StatusCode::kOutOfMemory); - } catch (std::exception &e) { - RETURN_STATUS_UNEXPECTED(e.what()); - } - return Status::OK(); + deallocate(); + n_ = n; + return MakeUnique(&ptr_, alloc_, n, std::forward(args)...); } ~MemGuard() noexcept { deallocate(); } /// \brief Getter function @@ -170,7 +171,7 @@ class MemGuard { private: size_t n_; allocator alloc_; - std::unique_ptr ptr_; + std::unique_ptr> ptr_; }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/util/arena.cc b/mindspore/ccsrc/minddata/dataset/util/arena.cc index 87a9c614a83..85ce35e6610 100644 --- a/mindspore/ccsrc/minddata/dataset/util/arena.cc +++ b/mindspore/ccsrc/minddata/dataset/util/arena.cc @@ -33,21 +33,19 @@ struct MemHdr { *hdr = *tmp; } }; -Status Arena::Init() { - RETURN_IF_NOT_OK(DeMalloc(size_in_MB_ * 1048576L, &ptr_, false)); + +ArenaImpl::ArenaImpl(void *ptr, size_t sz) : size_in_bytes_(sz), ptr_(ptr) { // Divide the memory into blocks. Ignore the last partial block. uint64_t num_blks = size_in_bytes_ / ARENA_BLK_SZ; MS_LOG(DEBUG) << "Size of memory pool is " << num_blks << ", number of blocks of size is " << ARENA_BLK_SZ << "."; tr_.Insert(0, num_blks); - return Status::OK(); } -Status Arena::Allocate(size_t n, void **p) { +Status ArenaImpl::Allocate(size_t n, void **p) { if (n == 0) { *p = nullptr; return Status::OK(); } - std::unique_lock lck(mux_); // Round up n to 1K block uint64_t req_size = static_cast(n) + ARENA_WALL_OVERHEAD_SZ; if (req_size > this->get_max_size()) { @@ -64,7 +62,6 @@ Status Arena::Allocate(size_t n, void **p) { if (size > reqBlk) { tr_.Insert(addr + reqBlk, size - reqBlk); } - lck.unlock(); char *q = static_cast(ptr_) + addr * ARENA_BLK_SZ; MemHdr::setHdr(q, addr, reqBlk); *p = get_user_addr(q); @@ -74,14 +71,24 @@ Status Arena::Allocate(size_t n, void **p) { return Status::OK(); } -void Arena::Deallocate(void *p) { +std::pair, bool> ArenaImpl::FindPrevBlk(uint64_t addr) { + for (auto &it : tr_) { + if (it.key + it.priority == addr) { + return std::make_pair(std::make_pair(it.key, it.priority), true); + } else if (it.key > addr) { + break; + } + } + return std::make_pair(std::make_pair(0, 0), false); +} + +void ArenaImpl::Deallocate(void *p) { auto *q = get_base_addr(p); MemHdr hdr(0, 0); MemHdr::getHdr(q, &hdr); MS_ASSERT(hdr.sig == 0xDEADBEEF); // We are going to insert a free block back to the treap. But first, check if we can combine // with the free blocks before and after to form a bigger block. - std::unique_lock lck(mux_); // Query if we have a free block after us. auto nextBlk = tr_.Search(hdr.addr + hdr.blk_size); if (nextBlk.second) { @@ -101,105 +108,7 @@ void Arena::Deallocate(void *p) { tr_.Insert(hdr.addr, hdr.blk_size); } -Status Arena::Reallocate(void **pp, size_t old_sz, size_t new_sz) { - MS_ASSERT(pp); - MS_ASSERT(*pp); - uint64_t actual_size = static_cast(new_sz) + ARENA_WALL_OVERHEAD_SZ; - if (actual_size > this->get_max_size()) { - RETURN_STATUS_UNEXPECTED("Request size too big : " + std::to_string(new_sz)); - } - uint64_t req_blk = SizeToBlk(actual_size); - char *oldAddr = reinterpret_cast(*pp); - auto *oldHdr = get_base_addr(oldAddr); - MemHdr hdr(0, 0); - MemHdr::getHdr(oldHdr, &hdr); - MS_ASSERT(hdr.sig == 0xDEADBEEF); - std::unique_lock lck(mux_); - if (hdr.blk_size > req_blk) { - // Refresh the header with the new smaller size. - MemHdr::setHdr(oldHdr, hdr.addr, req_blk); - // Return the unused memory back to the tree. Unlike allocate, we we need to merge with the block after us. - auto next_blk = tr_.Search(hdr.addr + hdr.blk_size); - if (next_blk.second) { - hdr.blk_size += next_blk.first.priority; - tr_.DeleteKey(next_blk.first.key); - } - tr_.Insert(hdr.addr + req_blk, hdr.blk_size - req_blk); - } else if (hdr.blk_size < req_blk) { - uint64_t addr = hdr.addr; - // Attempt a block enlarge. No guarantee it is always successful. - bool success = BlockEnlarge(&addr, hdr.blk_size, req_blk); - if (success) { - auto *newHdr = static_cast(ptr_) + addr * ARENA_BLK_SZ; - MemHdr::setHdr(newHdr, addr, req_blk); - if (addr != hdr.addr) { - errno_t err = - memmove_s(get_user_addr(newHdr), (req_blk * ARENA_BLK_SZ) - ARENA_WALL_OVERHEAD_SZ, oldAddr, old_sz); - if (err) { - RETURN_STATUS_UNEXPECTED("Error from memmove: " + std::to_string(err)); - } - } - *pp = get_user_addr(newHdr); - return Status::OK(); - } - // If we reach here, allocate a new block and simply move the content from the old to the new place. - // Unlock since allocate will grab the lock again. - lck.unlock(); - return FreeAndAlloc(pp, old_sz, new_sz); - } - return Status::OK(); -} - -std::ostream &operator<<(std::ostream &os, const Arena &s) { - for (auto &it : s.tr_) { - os << "Address : " << it.key << ". Size : " << it.priority << "\n"; - } - return os; -} - -Arena::Arena(size_t val_in_MB) : ptr_(nullptr), size_in_MB_(val_in_MB), size_in_bytes_(val_in_MB * 1048576L) {} - -Status Arena::CreateArena(std::shared_ptr *p_ba, size_t val_in_MB) { - if (p_ba == nullptr) { - RETURN_STATUS_UNEXPECTED("p_ba is null"); - } - Status rc; - auto ba = new (std::nothrow) Arena(val_in_MB); - if (ba == nullptr) { - return Status(StatusCode::kOutOfMemory); - } - rc = ba->Init(); - if (rc.IsOk()) { - (*p_ba).reset(ba); - } else { - delete ba; - } - return rc; -} - -int Arena::PercentFree() const { - uint64_t sz = 0; - for (auto &it : tr_) { - sz += it.priority; - } - double ratio = static_cast(sz * ARENA_BLK_SZ) / static_cast(size_in_bytes_); - return static_cast(ratio * 100.0); -} - -uint64_t Arena::get_max_size() const { return (size_in_bytes_ - ARENA_WALL_OVERHEAD_SZ); } - -std::pair, bool> Arena::FindPrevBlk(uint64_t addr) { - for (auto &it : tr_) { - if (it.key + it.priority == addr) { - return std::make_pair(std::make_pair(it.key, it.priority), true); - } else if (it.key > addr) { - break; - } - } - return std::make_pair(std::make_pair(0, 0), false); -} - -bool Arena::BlockEnlarge(uint64_t *addr, uint64_t old_sz, uint64_t new_sz) { +bool ArenaImpl::BlockEnlarge(uint64_t *addr, uint64_t old_sz, uint64_t new_sz) { uint64_t size = old_sz; // The logic is very much identical to Deallocate. We will see if we can combine with the blocks before and after. auto next_blk = tr_.Search(*addr + old_sz); @@ -237,7 +146,7 @@ bool Arena::BlockEnlarge(uint64_t *addr, uint64_t old_sz, uint64_t new_sz) { return false; } -Status Arena::FreeAndAlloc(void **pp, size_t old_sz, size_t new_sz) { +Status ArenaImpl::FreeAndAlloc(void **pp, size_t old_sz, size_t new_sz) { MS_ASSERT(pp); MS_ASSERT(*pp); void *p = nullptr; @@ -252,5 +161,98 @@ Status Arena::FreeAndAlloc(void **pp, size_t old_sz, size_t new_sz) { Deallocate(q); return Status::OK(); } + +Status ArenaImpl::Reallocate(void **pp, size_t old_sz, size_t new_sz) { + MS_ASSERT(pp); + MS_ASSERT(*pp); + uint64_t actual_size = static_cast(new_sz) + ARENA_WALL_OVERHEAD_SZ; + if (actual_size > this->get_max_size()) { + RETURN_STATUS_UNEXPECTED("Request size too big : " + std::to_string(new_sz)); + } + uint64_t req_blk = SizeToBlk(actual_size); + char *oldAddr = reinterpret_cast(*pp); + auto *oldHdr = get_base_addr(oldAddr); + MemHdr hdr(0, 0); + MemHdr::getHdr(oldHdr, &hdr); + MS_ASSERT(hdr.sig == 0xDEADBEEF); + if (hdr.blk_size > req_blk) { + // Refresh the header with the new smaller size. + MemHdr::setHdr(oldHdr, hdr.addr, req_blk); + // Return the unused memory back to the tree. Unlike allocate, we we need to merge with the block after us. + auto next_blk = tr_.Search(hdr.addr + hdr.blk_size); + if (next_blk.second) { + hdr.blk_size += next_blk.first.priority; + tr_.DeleteKey(next_blk.first.key); + } + tr_.Insert(hdr.addr + req_blk, hdr.blk_size - req_blk); + } else if (hdr.blk_size < req_blk) { + uint64_t addr = hdr.addr; + // Attempt a block enlarge. No guarantee it is always successful. + bool success = BlockEnlarge(&addr, hdr.blk_size, req_blk); + if (success) { + auto *newHdr = static_cast(ptr_) + addr * ARENA_BLK_SZ; + MemHdr::setHdr(newHdr, addr, req_blk); + if (addr != hdr.addr) { + errno_t err = + memmove_s(get_user_addr(newHdr), (req_blk * ARENA_BLK_SZ) - ARENA_WALL_OVERHEAD_SZ, oldAddr, old_sz); + if (err) { + RETURN_STATUS_UNEXPECTED("Error from memmove: " + std::to_string(err)); + } + } + *pp = get_user_addr(newHdr); + return Status::OK(); + } + return FreeAndAlloc(pp, old_sz, new_sz); + } + return Status::OK(); +} + +int ArenaImpl::PercentFree() const { + uint64_t sz = 0; + for (auto &it : tr_) { + sz += it.priority; + } + double ratio = static_cast(sz * ARENA_BLK_SZ) / static_cast(size_in_bytes_); + return static_cast(ratio * 100.0); +} + +uint64_t ArenaImpl::SizeToBlk(uint64_t sz) { + uint64_t req_blk = sz / ARENA_BLK_SZ; + if (sz % ARENA_BLK_SZ) { + ++req_blk; + } + return req_blk; +} + +std::ostream &operator<<(std::ostream &os, const ArenaImpl &s) { + for (auto &it : s.tr_) { + os << "Address : " << it.key << ". Size : " << it.priority << "\n"; + } + return os; +} + +Status Arena::Init() { + try { + auto sz = size_in_MB_ * 1048576L; + mem_ = std::make_unique(sz); + impl_ = std::make_unique(mem_.get(), sz); + } catch (std::bad_alloc &e) { + return Status(StatusCode::kOutOfMemory); + } + return Status::OK(); +} + +Arena::Arena(size_t val_in_MB) : size_in_MB_(val_in_MB) {} + +Status Arena::CreateArena(std::shared_ptr *p_ba, size_t val_in_MB) { + RETURN_UNEXPECTED_IF_NULL(p_ba); + auto ba = new (std::nothrow) Arena(val_in_MB); + if (ba == nullptr) { + return Status(StatusCode::kOutOfMemory); + } + (*p_ba).reset(ba); + RETURN_IF_NOT_OK(ba->Init()); + return Status::OK(); +} } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/util/arena.h b/mindspore/ccsrc/minddata/dataset/util/arena.h index c9c83d027bd..132ff0e7eb2 100644 --- a/mindspore/ccsrc/minddata/dataset/util/arena.h +++ b/mindspore/ccsrc/minddata/dataset/util/arena.h @@ -41,63 +41,111 @@ namespace dataset { /// /// When a block of memory is freed. It is joined with the blocks before and after (if they are available) to /// form a bigger block. -class Arena : public MemoryPool { + +/// At the lowest level, we don't really care where the memory is coming from. +/// This allows other class to make use of Arena method and override the origin of the +/// memory, say from some unix shared memory instead. +/// \note Implementation class is not thread safe. Caller needs to ensure proper serialization +class ArenaImpl { public: - Arena(const Arena &) = delete; + /// Constructor + /// \param ptr The start of the memory address + /// \param sz Size of the memory block we manage + ArenaImpl(void *ptr, size_t sz); + ~ArenaImpl() { ptr_ = nullptr; } - Arena &operator=(const Arena &) = delete; + /// \brief Allocate a sub block + /// \param n Size requested + /// \param p pointer to where the result is stored + /// \return Status object. + Status Allocate(size_t n, void **p); - ~Arena() override { - if (ptr_ != nullptr) { - free(ptr_); - ptr_ = nullptr; - } - } + /// \brief Enlarge or shrink a sub block + /// \param old_sz Original size + /// \param new_sz New size + /// \return Status object + Status Reallocate(void **, size_t old_sz, size_t new_sz); - Status Allocate(size_t n, void **p) override; + /// \brief Free a sub block + /// \param Address of the block to be freed. + void Deallocate(void *); - Status Reallocate(void **, size_t old_sz, size_t new_sz) override; + /// \brief Calculate % free of the memory + /// \return Percent free + int PercentFree() const; - void Deallocate(void *) override; - - uint64_t get_max_size() const override; - - static uint64_t SizeToBlk(uint64_t sz) { - uint64_t req_blk = sz / ARENA_BLK_SZ; - if (sz % ARENA_BLK_SZ) { - ++req_blk; - } - return req_blk; - } - - int PercentFree() const override; + /// \brief What is the maximum we can support in allocate. + /// \return Max value + uint64_t get_max_size() const { return (size_in_bytes_ - ARENA_WALL_OVERHEAD_SZ); } + /// \brief Get the start of the address. Read only + /// \return Start of the address block const void *get_base_addr() const { return ptr_; } - friend std::ostream &operator<<(std::ostream &os, const Arena &s); + static uint64_t SizeToBlk(uint64_t sz); + friend std::ostream &operator<<(std::ostream &os, const ArenaImpl &s); + private: + size_t size_in_bytes_; + Treap tr_; + void *ptr_; + + void *get_user_addr(void *base_addr) const { return reinterpret_cast(base_addr) + ARENA_WALL_OVERHEAD_SZ; } + void *get_base_addr(void *user_addr) const { return reinterpret_cast(user_addr) - ARENA_WALL_OVERHEAD_SZ; } + std::pair, bool> FindPrevBlk(uint64_t addr); + bool BlockEnlarge(uint64_t *addr, uint64_t old_sz, uint64_t new_sz); + Status FreeAndAlloc(void **pp, size_t old_sz, size_t new_sz); +}; + +/// \brief This version of Arena allocates from private memory +class Arena : public MemoryPool { + public: + // Disable copy and assignment constructor + Arena(const Arena &) = delete; + Arena &operator=(const Arena &) = delete; + ~Arena() override = default; + + /// As a derived class of MemoryPool, we have to implement the following. + /// But we simply transfer the call to the implementation class + Status Allocate(size_t size, void **pVoid) override { + std::unique_lock lock(mux_); + return impl_->Allocate(size, pVoid); + } + Status Reallocate(void **pVoid, size_t old_sz, size_t new_sz) override { + std::unique_lock lock(mux_); + return impl_->Reallocate(pVoid, old_sz, new_sz); + } + void Deallocate(void *pVoid) override { + std::unique_lock lock(mux_); + impl_->Deallocate(pVoid); + } + uint64_t get_max_size() const override { return impl_->get_max_size(); } + int PercentFree() const override { + std::unique_lock lock(mux_); + return impl_->PercentFree(); + } + + /// \return Return the start of the memory block + const void *get_base_addr() const { return impl_->get_base_addr(); } + + /// \brief Dump the memory allocation block. + friend std::ostream &operator<<(std::ostream &os, const Arena &s) { + os << *(s.impl_); + return os; + } + + /// The only method to create an arena. static Status CreateArena(std::shared_ptr *p_ba, size_t val_in_MB = 4096); protected: - std::mutex mux_; - Treap tr_; - void *ptr_; + mutable std::mutex mux_; + std::unique_ptr impl_; + std::unique_ptr mem_; size_t size_in_MB_; - size_t size_in_bytes_; explicit Arena(size_t val_in_MB = 4096); - std::pair, bool> FindPrevBlk(uint64_t addr); - Status Init(); - - bool BlockEnlarge(uint64_t *addr, uint64_t old_sz, uint64_t new_sz); - - Status FreeAndAlloc(void **pp, size_t old_sz, size_t new_sz); - - void *get_user_addr(void *base_addr) const { return reinterpret_cast(base_addr) + ARENA_WALL_OVERHEAD_SZ; } - - void *get_base_addr(void *user_addr) const { return reinterpret_cast(user_addr) - ARENA_WALL_OVERHEAD_SZ; } }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/util/buddy.cc b/mindspore/ccsrc/minddata/dataset/util/buddy.cc index d4f5434f811..bbca344bb67 100644 --- a/mindspore/ccsrc/minddata/dataset/util/buddy.cc +++ b/mindspore/ccsrc/minddata/dataset/util/buddy.cc @@ -47,10 +47,16 @@ Status BuddySpace::Init() { size_t offset_1 = sizeof(rel_addr_t) * num_lvl_; size_t offset_2 = sizeof(int) * num_lvl_ + offset_1; size_t offset_3 = sizeof(char) * BitLeftShift(1, num_lvl_ - 3) + offset_2; - RETURN_IF_NOT_OK(DeMalloc(offset_3, &ptr_, true)); - hint_ = reinterpret_cast(ptr_); - count_ = reinterpret_cast((reinterpret_cast(ptr_) + offset_1)); - map_ = reinterpret_cast(ptr_) + offset_2; + try { + mem_ = std::make_unique(offset_3); + } catch (const std::bad_alloc &e) { + return Status(StatusCode::kOutOfMemory); + } + (void)memset_s(mem_.get(), offset_3, 0, offset_3); + auto ptr = mem_.get(); + hint_ = reinterpret_cast(ptr); + count_ = reinterpret_cast((reinterpret_cast(ptr) + offset_1)); + map_ = reinterpret_cast(ptr) + offset_2; count_[num_lvl_ - 1] = 1; map_[0] = BitOr(MORE_BIT, num_lvl_ - 3); return Status::OK(); @@ -352,19 +358,9 @@ int BuddySpace::PercentFree() const { } BuddySpace::BuddySpace(int log_min, int num_lvl) - : hint_(nullptr), - count_(nullptr), - map_(nullptr), - log_min_(log_min), - num_lvl_(num_lvl), - min_(0), - max_(0), - ptr_(nullptr) {} + : hint_(nullptr), count_(nullptr), map_(nullptr), log_min_(log_min), num_lvl_(num_lvl), min_(0), max_(0) {} BuddySpace::~BuddySpace() { - if (ptr_ != nullptr) { - free(ptr_); - } hint_ = nullptr; count_ = nullptr; map_ = nullptr; diff --git a/mindspore/ccsrc/minddata/dataset/util/buddy.h b/mindspore/ccsrc/minddata/dataset/util/buddy.h index d95c2b0528b..97834c1c436 100644 --- a/mindspore/ccsrc/minddata/dataset/util/buddy.h +++ b/mindspore/ccsrc/minddata/dataset/util/buddy.h @@ -94,7 +94,7 @@ class BuddySpace { int num_lvl_; uint64_t min_; uint64_t max_; - void *ptr_; + std::unique_ptr mem_; std::mutex mutex_; explicit BuddySpace(int log_min = 15, int num_lvl = 18); diff --git a/mindspore/ccsrc/minddata/dataset/util/queue.h b/mindspore/ccsrc/minddata/dataset/util/queue.h index 021ee5ab102..417a1ab507c 100644 --- a/mindspore/ccsrc/minddata/dataset/util/queue.h +++ b/mindspore/ccsrc/minddata/dataset/util/queue.h @@ -33,18 +33,6 @@ namespace mindspore { namespace dataset { -template -struct is_shared_ptr : public std::false_type {}; - -template -struct is_shared_ptr> : public std::true_type {}; - -template -struct is_unique_ptr : public std::false_type {}; - -template -struct is_unique_ptr> : public std::true_type {}; - // A simple thread safe queue using a fixed size array template class Queue { @@ -55,44 +43,25 @@ class Queue { using reference = T &; using const_reference = const T &; - void Init() { - if (sz_ > 0) { - // We allocate a block of memory and then call the default constructor for each slot. Maybe simpler to call - // new[] but we want to control where the memory is allocated from. - arr_ = alloc_.allocate(sz_); - for (uint64_t i = 0; i < sz_; i++) { - std::allocator_traits>::construct(alloc_, &(arr_[i])); - } - } - } - explicit Queue(int sz) - : sz_(sz), - arr_(nullptr), - head_(0), - tail_(0), - my_name_(Services::GetUniqueID()), - alloc_(Services::GetInstance().GetServiceMemPool()) { - Init(); - MS_LOG(DEBUG) << "Create Q with uuid " << my_name_ << " of size " << sz_ << "."; - } - - virtual ~Queue() { - ResetQue(); - if (arr_) { - // Simply free the pointer. Since there is nothing in the queue. We don't want to invoke the destructor - // of T in each slot. - alloc_.deallocate(arr_); - arr_ = nullptr; + : sz_(sz), arr_(Services::GetAllocator()), head_(0), tail_(0), my_name_(Services::GetUniqueID()) { + Status rc = arr_.allocate(sz); + if (rc.IsError()) { + MS_LOG(ERROR) << "Fail to create a queue."; + std::terminate(); + } else { + MS_LOG(DEBUG) << "Create Q with uuid " << my_name_ << " of size " << sz_ << "."; } } - int size() const { - int v = tail_ - head_; + virtual ~Queue() { ResetQue(); } + + size_t size() const { + size_t v = tail_ - head_; return (v >= 0) ? v : 0; } - int capacity() const { return sz_; } + size_t capacity() const { return sz_; } bool empty() const { return head_ == tail_; } @@ -104,8 +73,8 @@ class Queue { // Block when full Status rc = full_cv_.Wait(&_lock, [this]() -> bool { return (size() != capacity()); }); if (rc.IsOk()) { - uint32_t k = tail_++ % sz_; - arr_[k] = ele; + auto k = tail_++ % sz_; + *(arr_[k]) = ele; empty_cv_.NotifyAll(); _lock.unlock(); } else { @@ -119,8 +88,8 @@ class Queue { // Block when full Status rc = full_cv_.Wait(&_lock, [this]() -> bool { return (size() != capacity()); }); if (rc.IsOk()) { - uint32_t k = tail_++ % sz_; - arr_[k] = std::forward(ele); + auto k = tail_++ % sz_; + *(arr_[k]) = std::forward(ele); empty_cv_.NotifyAll(); _lock.unlock(); } else { @@ -135,8 +104,8 @@ class Queue { // Block when full Status rc = full_cv_.Wait(&_lock, [this]() -> bool { return (size() != capacity()); }); if (rc.IsOk()) { - uint32_t k = tail_++ % sz_; - new (&(arr_[k])) T(std::forward(args)...); + auto k = tail_++ % sz_; + new (arr_[k]) T(std::forward(args)...); empty_cv_.NotifyAll(); _lock.unlock(); } else { @@ -151,20 +120,8 @@ class Queue { // Block when empty Status rc = empty_cv_.Wait(&_lock, [this]() -> bool { return !empty(); }); if (rc.IsOk()) { - uint32_t k = head_++ % sz_; - *p = std::move(arr_[k]); - if (std::is_destructible::value) { - // std::move above only changes arr_[k] from rvalue to lvalue. - // The real implementation of move constructor depends on T. - // It may be compiler generated or user defined. But either case - // the result of arr_[k] is still a valid object of type T, and - // we will not keep any extra copy in the queue. - arr_[k].~T(); - // For gcc 9, an extra fix is needed here to clear the memory content - // of arr_[k] because this slot can be reused by another Add which can - // do another std::move. We have seen SEGV here in this case. - std::allocator_traits>::construct(alloc_, &(arr_[k])); - } + auto k = head_++ % sz_; + *p = std::move(*(arr_[k])); full_cv_.NotifyAll(); _lock.unlock(); } else { @@ -175,15 +132,15 @@ class Queue { void ResetQue() noexcept { std::unique_lock _lock(mux_); - // If there are elements in the queue, invoke its destructor one by one. - if (!empty() && std::is_destructible::value) { - for (uint64_t i = head_; i < tail_; i++) { - uint32_t k = i % sz_; - arr_[k].~T(); - } - } - for (uint64_t i = 0; i < sz_; i++) { - std::allocator_traits>::construct(alloc_, &(arr_[i])); + // If there are elements in the queue, drain them. We won't call PopFront directly + // because we have got the lock already. We will deadlock if we call PopFront + for (auto i = head_; i < tail_; ++i) { + auto k = i % sz_; + auto val = std::move(*(arr_[k])); + // Let val go out of scope and its destructor will be invoked automatically. + // But our compiler may complain val is not in use. So let's do some useless + // stuff. + MS_LOG(DEBUG) << "Address of val: " << &val; } empty_cv_.ResetIntrpState(); full_cv_.ResetIntrpState(); @@ -202,15 +159,14 @@ class Queue { } private: - uint64_t sz_; - pointer arr_; - uint64_t head_; - uint64_t tail_; + size_t sz_; + MemGuard> arr_; + size_t head_; + size_t tail_; std::string my_name_; std::mutex mux_; CondVar empty_cv_; CondVar full_cv_; - Allocator alloc_; }; // A container of queues with [] operator accessors. Basically this is a wrapper over of a vector of queues @@ -237,7 +193,7 @@ class QueueList { return Status::OK(); } - int size() const { return queue_list_.size(); } + auto size() const { return queue_list_.size(); } std::unique_ptr> &operator[](const int index) { return queue_list_[index]; } diff --git a/tests/ut/cpp/dataset/arena_test.cc b/tests/ut/cpp/dataset/arena_test.cc index 10d27b51c66..08f4da32d80 100644 --- a/tests/ut/cpp/dataset/arena_test.cc +++ b/tests/ut/cpp/dataset/arena_test.cc @@ -15,7 +15,9 @@ */ #include +#include "minddata/dataset/util/allocator.h" #include "minddata/dataset/util/arena.h" +#include "minddata/dataset/util/system_pool.h" #include "common/common.h" #include "utils/log_adapter.h" @@ -27,11 +29,10 @@ class MindDataTestArena : public UT::Common { }; -TEST_F(MindDataTestArena, TestALLFunction) { +TEST_F(MindDataTestArena, Test1) { std::shared_ptr mp; Status rc = Arena::CreateArena(&mp); ASSERT_TRUE(rc.IsOk()); - std::shared_ptr arena = std::dynamic_pointer_cast(mp); std::vector v; srand(time(NULL)); @@ -46,3 +47,25 @@ TEST_F(MindDataTestArena, TestALLFunction) { } MS_LOG(DEBUG) << *mp; } + +TEST_F(MindDataTestArena, Test2) { + std::shared_ptr arena; + Status rc = Arena::CreateArena(&arena); + std::shared_ptr mp = std::static_pointer_cast(arena); + auto alloc = Allocator(mp); + ASSERT_TRUE(rc.IsOk()); + std::vector> v(alloc); + v.reserve(1000); + for (auto i = 0; i < 1000; ++i) { + v.push_back(i); + } + // Test copy + std::vector> w(v, SystemPool::GetAllocator()); + auto val = w.at(10); + EXPECT_EQ(val, 10); + // Test move + std::vector> s(std::move(v), SystemPool::GetAllocator()); + val = s.at(100); + EXPECT_EQ(val, 100); + EXPECT_EQ(v.size(), 0); +} diff --git a/tests/ut/cpp/dataset/queue_test.cc b/tests/ut/cpp/dataset/queue_test.cc index ec40cc2ae48..fcc4e1a54d8 100644 --- a/tests/ut/cpp/dataset/queue_test.cc +++ b/tests/ut/cpp/dataset/queue_test.cc @@ -50,6 +50,13 @@ class RefCount { v_ = o.v_; return *this; } + RefCount(RefCount &&o) : v_(std::move(o.v_)) {} + RefCount &operator=(RefCount &&o) { + if (&o != this) { + v_ = std::move(o.v_); + } + return *this; + } std::shared_ptr v_; }; @@ -148,8 +155,9 @@ TEST_F(MindDataTestQueue, Test4) { test4(); } TEST_F(MindDataTestQueue, Test5) { test4(); // Assume we have run Test4. The destructor of the RefCount should be called 4 times. - // One for a. One for b. One for line 125 when we pop. One for the stale element in the queue. - ASSERT_EQ(gRefCountDestructorCalled, 4); + // One for a. One for b. One for the stale element in the queue. 3 more for + // the one in the queue (but they are empty). + ASSERT_EQ(gRefCountDestructorCalled, 6); } TEST_F(MindDataTestQueue, Test6) { @@ -169,70 +177,3 @@ TEST_F(MindDataTestQueue, Test6) { MS_LOG(INFO) << "Popped value " << *pepped_value << " from queue index " << chosen_queue_index; ASSERT_EQ(*pepped_value, 99); } -using namespace std::chrono; -template -void Perf(int n, int p, std::string name) { - auto payload = std::vector(n, PayloadType(p)); - auto queue = QueueType(n); - auto t0 = high_resolution_clock::now(); - auto check = 0; - for (int i = 0; i < queue.capacity(); i++) { - queue.Add(PayloadType(p)); - } - check = queue.size(); - for (int i = 0; i < queue.capacity(); i++) { - queue.PopFront(&payload[i]); - } - auto t1 = high_resolution_clock::now(); - std::cout << name << " queue filled size: " << queue.size() << " " << check << std::endl; - auto t2 = high_resolution_clock::now(); - for (int i = 0; i < queue.capacity(); i++) { - queue.Add(PayloadType(p)); - } - check = queue.size(); - for (int i = 0; i < queue.capacity(); i++) { - queue.PopFront(&payload[i]); - } - auto t3 = high_resolution_clock::now(); - auto d = duration_cast(t3 - t2 + t1 - t0).count(); - std::cout << name << " queue emptied size: " << queue.size() << " " << check << std::endl; - std::cout << name << " " - << " ran in " << d << "ms" << std::endl; -} - -template -void Fuzz(int n, int p, std::string name) { - std::mt19937 gen(1); - auto payload = std::vector(n, PayloadType(p)); - auto queue = QueueType(n); - auto dist = std::uniform_int_distribution(0, 2); - std::cout << "###" << std::endl; - for (auto i = 0; i < n; i++) { - auto v = dist(gen); - if (v == 0 && queue.size() < n - 1) { - queue.Add(std::move(payload[i])); - } - if (v == 1 && queue.size() > 0) { - queue.PopFront(&payload[i]); - } else { - queue.Reset(); - } - } - std::cout << name << " fuzz ran " << queue.size() << std::endl; -} -TEST_F(MindDataTestQueue, TestPerf) { - try { - int kSz = 1000000; - // std::cout << "enter size" << std::endl; - // std::cin >> kSz; - Perf>, std::vector>(kSz, 1, "old queue, vector of size 1"); - } catch (const std::exception &e) { - std::cout << e.what() << std::endl; - } - - std::cout << "Test Reset" << std::endl; - std::cout << "Enter fuzz size" << std::endl; - int fs = 1000; -// std::cin >> fs; - Fuzz>, std::vector>(fs, 1, "New queue"); -}