diff --git a/mindspore/ccsrc/dataset/engine/cache/cache_service.cc b/mindspore/ccsrc/dataset/engine/cache/cache_service.cc index 1cbe3fdb4e1..555413a5662 100644 --- a/mindspore/ccsrc/dataset/engine/cache/cache_service.cc +++ b/mindspore/ccsrc/dataset/engine/cache/cache_service.cc @@ -105,7 +105,7 @@ Status CacheService::CacheRow(const std::vector &buf, row_id_type RETURN_IF_NOT_OK(cp_->Insert(all_data, &key)); Status rc = map_->DoInsert(*row_id_generated, key); if (rc == Status(StatusCode::kDuplicateKey)) { - MS_LOG(DEBUG) << "Ignoring duplicate key"; + MS_LOG(DEBUG) << "Ignoring duplicate key."; } else { RETURN_IF_NOT_OK(rc); } diff --git a/mindspore/ccsrc/dataset/engine/datasetops/cache_base_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/cache_base_op.cc index 42d3f0fee3d..c943f8bd7ab 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/cache_base_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/cache_base_op.cc @@ -53,7 +53,7 @@ CacheBase::CacheBase(int32_t num_workers, int32_t op_connector_size, int32_t row cache_client_(cache_client), rows_per_buffer_(rows_per_buf), // We can cause deadlock if this internal Connector size is too small. - keys_miss_(num_workers_, 1, 1024) { + keys_miss_(num_workers_, 1, connector_capacity_) { io_block_queues_.Init(num_workers, op_connector_size); } // Common function to fetch samples from the sampler and send them using the io_block_queues to diff --git a/mindspore/ccsrc/dataset/engine/datasetops/cache_base_op.h b/mindspore/ccsrc/dataset/engine/datasetops/cache_base_op.h index a6a98fc4ad8..9f90b7cd9d1 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/cache_base_op.h +++ b/mindspore/ccsrc/dataset/engine/datasetops/cache_base_op.h @@ -48,8 +48,6 @@ class CacheBase : public ParallelOp { /// \brief Destructor ~CacheBase(); - constexpr static int eoe_row_id = -1; - /// \brief Overrides base class reset method. When an operator does a reset, it cleans up any state /// info from it's previous execution and then initializes itself so that it can be executed /// again. @@ -80,6 +78,7 @@ class CacheBase : public ParallelOp { virtual bool AllowCacheMiss() = 0; protected: + constexpr static int32_t eoe_row_id = -1; std::shared_ptr cache_client_; WaitPost epoch_sync_; int32_t rows_per_buffer_; @@ -100,6 +99,7 @@ class CacheBase : public ParallelOp { Status UpdateColumnMapFromCache(); private: + constexpr static int32_t connector_capacity_ = 1024; QueueList> io_block_queues_; }; } // namespace dataset diff --git a/mindspore/ccsrc/dataset/engine/datasetops/cache_merge_op.cc b/mindspore/ccsrc/dataset/engine/datasetops/cache_merge_op.cc index 5d00ec071f2..f2d5173348d 100644 --- a/mindspore/ccsrc/dataset/engine/datasetops/cache_merge_op.cc +++ b/mindspore/ccsrc/dataset/engine/datasetops/cache_merge_op.cc @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#include "dataset/engine/datasetops/cache_merge_op.h" #include #include @@ -20,7 +21,6 @@ #include "dataset/core/config_manager.h" #include "dataset/core/constants.h" #include "dataset/core/global_context.h" -#include "dataset/engine/datasetops/cache_merge_op.h" #include "dataset/engine/opt/pass.h" #include "dataset/engine/execution_tree.h" #include "dataset/util/task_manager.h" @@ -50,7 +50,8 @@ Status CacheMergeOp::operator()() { // A queue of row id to let cleaner send cache miss rows to the cache server // We don't want a small queue as this will block the parallel op workers. // A row id is 8 byte integer. So bigger size doesn't consume a lot of memory. - io_que_ = std::make_unique>(512); + static const int32_t queue_sz = 512; + io_que_ = std::make_unique>(queue_sz); RETURN_IF_NOT_OK(io_que_->Register(tree_->AllTasks())); RETURN_IF_NOT_OK( tree_->LaunchWorkers(num_workers_, std::bind(&CacheMergeOp::WorkerEntry, this, std::placeholders::_1))); @@ -151,7 +152,7 @@ Status CacheMergeOp::Cleaner() { } TensorRow row; RETURN_IF_NOT_OK(rq->Release(&row)); - CHECK_FAIL_RETURN_UNEXPECTED(!row.empty(), "Programming error"); + CHECK_FAIL_RETURN_UNEXPECTED(!row.empty(), "Programming error."); Status rc = cache_client_->WriteRow(row); // Bad rc should not bring down the pipeline if (rc.IsError()) {