Addressing late comment

This commit is contained in:
Jesse Lee 2020-07-13 14:44:46 -04:00
parent eadcb341e1
commit 2909b637d8
4 changed files with 8 additions and 7 deletions

View File

@ -105,7 +105,7 @@ Status CacheService::CacheRow(const std::vector<const void *> &buf, row_id_type
RETURN_IF_NOT_OK(cp_->Insert(all_data, &key)); RETURN_IF_NOT_OK(cp_->Insert(all_data, &key));
Status rc = map_->DoInsert(*row_id_generated, key); Status rc = map_->DoInsert(*row_id_generated, key);
if (rc == Status(StatusCode::kDuplicateKey)) { if (rc == Status(StatusCode::kDuplicateKey)) {
MS_LOG(DEBUG) << "Ignoring duplicate key"; MS_LOG(DEBUG) << "Ignoring duplicate key.";
} else { } else {
RETURN_IF_NOT_OK(rc); RETURN_IF_NOT_OK(rc);
} }

View File

@ -53,7 +53,7 @@ CacheBase::CacheBase(int32_t num_workers, int32_t op_connector_size, int32_t row
cache_client_(cache_client), cache_client_(cache_client),
rows_per_buffer_(rows_per_buf), rows_per_buffer_(rows_per_buf),
// We can cause deadlock if this internal Connector size is too small. // We can cause deadlock if this internal Connector size is too small.
keys_miss_(num_workers_, 1, 1024) { keys_miss_(num_workers_, 1, connector_capacity_) {
io_block_queues_.Init(num_workers, op_connector_size); io_block_queues_.Init(num_workers, op_connector_size);
} }
// Common function to fetch samples from the sampler and send them using the io_block_queues to // Common function to fetch samples from the sampler and send them using the io_block_queues to

View File

@ -48,8 +48,6 @@ class CacheBase : public ParallelOp {
/// \brief Destructor /// \brief Destructor
~CacheBase(); ~CacheBase();
constexpr static int eoe_row_id = -1;
/// \brief Overrides base class reset method. When an operator does a reset, it cleans up any state /// \brief Overrides base class reset method. When an operator does a reset, it cleans up any state
/// info from it's previous execution and then initializes itself so that it can be executed /// info from it's previous execution and then initializes itself so that it can be executed
/// again. /// again.
@ -80,6 +78,7 @@ class CacheBase : public ParallelOp {
virtual bool AllowCacheMiss() = 0; virtual bool AllowCacheMiss() = 0;
protected: protected:
constexpr static int32_t eoe_row_id = -1;
std::shared_ptr<CacheClient> cache_client_; std::shared_ptr<CacheClient> cache_client_;
WaitPost epoch_sync_; WaitPost epoch_sync_;
int32_t rows_per_buffer_; int32_t rows_per_buffer_;
@ -100,6 +99,7 @@ class CacheBase : public ParallelOp {
Status UpdateColumnMapFromCache(); Status UpdateColumnMapFromCache();
private: private:
constexpr static int32_t connector_capacity_ = 1024;
QueueList<std::unique_ptr<IOBlock>> io_block_queues_; QueueList<std::unique_ptr<IOBlock>> io_block_queues_;
}; };
} // namespace dataset } // namespace dataset

View File

@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
#include "dataset/engine/datasetops/cache_merge_op.h"
#include <algorithm> #include <algorithm>
#include <functional> #include <functional>
@ -20,7 +21,6 @@
#include "dataset/core/config_manager.h" #include "dataset/core/config_manager.h"
#include "dataset/core/constants.h" #include "dataset/core/constants.h"
#include "dataset/core/global_context.h" #include "dataset/core/global_context.h"
#include "dataset/engine/datasetops/cache_merge_op.h"
#include "dataset/engine/opt/pass.h" #include "dataset/engine/opt/pass.h"
#include "dataset/engine/execution_tree.h" #include "dataset/engine/execution_tree.h"
#include "dataset/util/task_manager.h" #include "dataset/util/task_manager.h"
@ -50,7 +50,8 @@ Status CacheMergeOp::operator()() {
// A queue of row id to let cleaner send cache miss rows to the cache server // A queue of row id to let cleaner send cache miss rows to the cache server
// We don't want a small queue as this will block the parallel op workers. // We don't want a small queue as this will block the parallel op workers.
// A row id is 8 byte integer. So bigger size doesn't consume a lot of memory. // A row id is 8 byte integer. So bigger size doesn't consume a lot of memory.
io_que_ = std::make_unique<Queue<row_id_type>>(512); static const int32_t queue_sz = 512;
io_que_ = std::make_unique<Queue<row_id_type>>(queue_sz);
RETURN_IF_NOT_OK(io_que_->Register(tree_->AllTasks())); RETURN_IF_NOT_OK(io_que_->Register(tree_->AllTasks()));
RETURN_IF_NOT_OK( RETURN_IF_NOT_OK(
tree_->LaunchWorkers(num_workers_, std::bind(&CacheMergeOp::WorkerEntry, this, std::placeholders::_1))); tree_->LaunchWorkers(num_workers_, std::bind(&CacheMergeOp::WorkerEntry, this, std::placeholders::_1)));
@ -151,7 +152,7 @@ Status CacheMergeOp::Cleaner() {
} }
TensorRow row; TensorRow row;
RETURN_IF_NOT_OK(rq->Release(&row)); RETURN_IF_NOT_OK(rq->Release(&row));
CHECK_FAIL_RETURN_UNEXPECTED(!row.empty(), "Programming error"); CHECK_FAIL_RETURN_UNEXPECTED(!row.empty(), "Programming error.");
Status rc = cache_client_->WriteRow(row); Status rc = cache_client_->WriteRow(row);
// Bad rc should not bring down the pipeline // Bad rc should not bring down the pipeline
if (rc.IsError()) { if (rc.IsError()) {