waiting tasks in the queue exceeds the limit

This commit is contained in:
yefeng 2022-04-28 10:01:43 +08:00
parent 4d088ae2c7
commit a7e625245e
3 changed files with 8 additions and 1 deletions

View File

@ -80,6 +80,7 @@ enum StatusCode : uint32_t {
kLiteThreadPoolError = kLite | (0x0FFFFFFF & -8), /**< Error occur in thread pool. */ kLiteThreadPoolError = kLite | (0x0FFFFFFF & -8), /**< Error occur in thread pool. */
kLiteUninitializedObj = kLite | (0x0FFFFFFF & -9), /**< Object is not initialized. */ kLiteUninitializedObj = kLite | (0x0FFFFFFF & -9), /**< Object is not initialized. */
kLiteFileError = kLite | (0x0FFFFFFF & -10), /**< Invalid file. */ kLiteFileError = kLite | (0x0FFFFFFF & -10), /**< Invalid file. */
kLiteServiceDeny = kLite | (0x0FFFFFFF & -11), /**< Denial of service. */
// Executor error code, range: [-100,-200) // Executor error code, range: [-100,-200)
kLiteOutOfTensorRange = kLite | (0x0FFFFFFF & -100), /**< Failed to check range. */ kLiteOutOfTensorRange = kLite | (0x0FFFFFFF & -100), /**< Failed to check range. */

View File

@ -57,7 +57,7 @@ Status ModelParallelRunner::Predict(const std::vector<MSTensor> &inputs, std::ve
auto status = model_pool_->Predict(inputs, outputs, before, after); auto status = model_pool_->Predict(inputs, outputs, before, after);
if (status != kSuccess) { if (status != kSuccess) {
MS_LOG(ERROR) << "model runner predict failed."; MS_LOG(ERROR) << "model runner predict failed.";
return kLiteError; return status;
} }
return kSuccess; return kSuccess;
} }

View File

@ -29,6 +29,7 @@ namespace mindspore {
namespace { namespace {
constexpr int32_t kNumThreads = 8; constexpr int32_t kNumThreads = 8;
constexpr int kNumDeviceInfo = 2; constexpr int kNumDeviceInfo = 2;
constexpr int kNumMaxTaskQueueSize = 1000;
int GetCoreNum() { int GetCoreNum() {
int core_num = 1; int core_num = 1;
#if defined(_MSC_VER) || defined(_WIN32) #if defined(_MSC_VER) || defined(_WIN32)
@ -639,6 +640,11 @@ Status ModelPool::Predict(const std::vector<MSTensor> &inputs, std::vector<MSTen
return kSuccess; return kSuccess;
} else { } else {
// do predict // do predict
if (predict_task_queue_->GetTaskNum(max_wait_worker_node_id) > kNumMaxTaskQueueSize) {
MS_LOG(ERROR) << "The number of waiting tasks in the queue exceeds the limit, ret=" << kLiteServiceDeny;
predict_task_mutex_.unlock();
return kLiteServiceDeny;
}
predict_task_queue_->DecreaseWaitModelNum(1, max_wait_worker_node_id); predict_task_queue_->DecreaseWaitModelNum(1, max_wait_worker_node_id);
auto predict_task = std::make_shared<PredictTask>(&inputs, outputs, before, after); auto predict_task = std::make_shared<PredictTask>(&inputs, outputs, before, after);
if (predict_task == nullptr) { if (predict_task == nullptr) {