!15560 Optimize cache_admin tool to allow multiple session destroy

From: @lixiachen
Reviewed-by: @robingrosman,@nsyca
Signed-off-by: @robingrosman
This commit is contained in:
mindspore-ci-bot 2021-04-27 07:55:29 +08:00 committed by Gitee
commit 83cdb8bb38
5 changed files with 85 additions and 25 deletions

View File

@ -17,6 +17,7 @@
#include <sys/types.h> #include <sys/types.h>
#include <sys/wait.h> #include <sys/wait.h>
#include <unistd.h> #include <unistd.h>
#include <algorithm>
#include <cerrno> #include <cerrno>
#include <iomanip> #include <iomanip>
#include <iostream> #include <iostream>
@ -39,7 +40,6 @@ const char CacheAdminArgHandler::kServerBinary[] = "cache_server";
CacheAdminArgHandler::CacheAdminArgHandler() CacheAdminArgHandler::CacheAdminArgHandler()
: port_(kCfgDefaultCachePort), : port_(kCfgDefaultCachePort),
session_id_(0),
num_workers_(kDefaultNumWorkers), num_workers_(kDefaultNumWorkers),
shm_mem_sz_(kDefaultSharedMemorySizeInGB), shm_mem_sz_(kDefaultSharedMemorySizeInGB),
log_level_(kDefaultLogLevel), log_level_(kDefaultLogLevel),
@ -102,6 +102,52 @@ CacheAdminArgHandler::CacheAdminArgHandler()
CacheAdminArgHandler::~CacheAdminArgHandler() = default; CacheAdminArgHandler::~CacheAdminArgHandler() = default;
Status CacheAdminArgHandler::AssignArg(std::string option, std::vector<uint32_t> *out_arg,
std::stringstream *arg_stream, CommandId command_id) {
// Detect if the user tried to provide this argument more than once
ArgValue selected_arg = arg_map_[option];
if (used_args_[selected_arg]) {
std::string err_msg = "The " + option + " argument was given more than once.";
return Status(StatusCode::kMDSyntaxError, err_msg);
}
// Flag that this arg is used now
used_args_[selected_arg] = true;
// Some options are just arguments, for example "--port 50052" is not a command, it's just a argument.
// Other options are actual commands, for example "--destroy_session 1234". This executes the destroy session.
// If this option is also a command, make sure there has not been multiple commands given before assigning it.
if (command_id != CommandId::kCmdUnknown) {
if (command_id_ != CommandId::kCmdUnknown) {
std::string err_msg = "Only one command at a time is allowed. Invalid command: " + option;
return Status(StatusCode::kMDSyntaxError, err_msg);
} else {
command_id_ = command_id;
}
}
uint32_t value_as_uint;
while (arg_stream->rdbuf()->in_avail() != 0) {
*arg_stream >> value_as_uint;
if (arg_stream->fail()) {
arg_stream->clear();
std::string value_as_string;
*arg_stream >> value_as_string;
std::string err_msg = "Invalid numeric value: " + value_as_string;
return Status(StatusCode::kMDSyntaxError, err_msg);
} else {
out_arg->push_back(value_as_uint);
}
}
if (out_arg->empty()) {
std::string err_msg = option + " option requires an argument field. Syntax: " + option + " <field>";
return Status(StatusCode::kMDSyntaxError, err_msg);
}
return Status::OK();
}
Status CacheAdminArgHandler::AssignArg(std::string option, int32_t *out_arg, std::stringstream *arg_stream, Status CacheAdminArgHandler::AssignArg(std::string option, int32_t *out_arg, std::stringstream *arg_stream,
CommandId command_id) { CommandId command_id) {
// Detect if the user tried to provide this argument more than once // Detect if the user tried to provide this argument more than once
@ -269,11 +315,7 @@ Status CacheAdminArgHandler::ParseArgStream(std::stringstream *arg_stream) {
break; break;
} }
case ArgValue::kArgDestroySession: { case ArgValue::kArgDestroySession: {
// session_id is an unsigned type. We may need to template the AssignArg function so that RETURN_IF_NOT_OK(AssignArg(tok, &session_ids_, arg_stream, CommandId::kCmdDestroySession));
// it can handle different flavours of integers instead of just int32_t.
int32_t session_int;
RETURN_IF_NOT_OK(AssignArg(tok, &session_int, arg_stream, CommandId::kCmdDestroySession));
session_id_ = session_int;
break; break;
} }
case ArgValue::kArgNumWorkers: { case ArgValue::kArgNumWorkers: {
@ -376,11 +418,13 @@ Status CacheAdminArgHandler::RunCommand() {
CacheClientGreeter comm(hostname_, port_, 1); CacheClientGreeter comm(hostname_, port_, 1);
RETURN_IF_NOT_OK(comm.ServiceStart()); RETURN_IF_NOT_OK(comm.ServiceStart());
CacheClientInfo cinfo; CacheClientInfo cinfo;
cinfo.set_session_id(session_id_); for (session_id_type id : session_ids_) {
cinfo.set_session_id(id);
auto rq = std::make_shared<DropSessionRequest>(cinfo); auto rq = std::make_shared<DropSessionRequest>(cinfo);
RETURN_IF_NOT_OK(comm.HandleRequest(rq)); RETURN_IF_NOT_OK(comm.HandleRequest(rq));
RETURN_IF_NOT_OK(rq->Wait()); RETURN_IF_NOT_OK(rq->Wait());
std::cout << "Drop session successfully for server on port " << std::to_string(port_) << std::endl; std::cout << "Drop session " << id << " successfully for server on port " << std::to_string(port_) << std::endl;
}
break; break;
} }
case CommandId::kCmdListSessions: { case CommandId::kCmdListSessions: {

View File

@ -22,6 +22,7 @@
#include <string> #include <string>
#include <sstream> #include <sstream>
#include <thread> #include <thread>
#include <vector>
#include "minddata/dataset/util/status.h" #include "minddata/dataset/util/status.h"
#include "minddata/dataset/engine/cache/cache_client.h" #include "minddata/dataset/engine/cache/cache_client.h"
@ -94,6 +95,9 @@ class CacheAdminArgHandler {
Status AssignArg(std::string option, float *out_arg, std::stringstream *arg_stream, Status AssignArg(std::string option, float *out_arg, std::stringstream *arg_stream,
CommandId command_id = CommandId::kCmdUnknown); CommandId command_id = CommandId::kCmdUnknown);
Status AssignArg(std::string option, std::vector<uint32_t> *out_arg, std::stringstream *arg_stream,
CommandId command_id = CommandId::kCmdUnknown);
Status Validate(); Status Validate();
CommandId command_id_; CommandId command_id_;
@ -102,7 +106,7 @@ class CacheAdminArgHandler {
int32_t shm_mem_sz_; int32_t shm_mem_sz_;
int32_t log_level_; int32_t log_level_;
float memory_cap_ratio_; float memory_cap_ratio_;
session_id_type session_id_; std::vector<session_id_type> session_ids_;
std::string hostname_; std::string hostname_;
std::string spill_dir_; std::string spill_dir_;
std::string trailing_args_; std::string trailing_args_;

View File

@ -332,16 +332,16 @@ bash run_parameter_server_train_gpu.sh [resnet50|resnet101] [cifar10|imagenet201
#### Evaluation while training #### Evaluation while training
```bash ```bash
# evaluation while distributed training Ascend example: # evaluation with distributed training Ascend example:
bash run_distribute_train.sh [resnet18|resnet50|resnet101|se-resnet50] [cifar10|imagenet2012] [RANK_TABLE_FILE] [DATASET_PATH] [RUN_EVAL](optional) [EVAL_DATASET_PATH](optional) bash run_distribute_train.sh [resnet18|resnet50|resnet101|se-resnet50] [cifar10|imagenet2012] [RANK_TABLE_FILE] [DATASET_PATH] [RUN_EVAL](optional) [EVAL_DATASET_PATH](optional)
# evaluation while standalone training Ascend example: # evaluation with standalone training Ascend example:
bash run_standalone_train.sh [resnet18|resnet50|resnet101|se-resnet50] [cifar10|imagenet2012] [RANK_TABLE_FILE] [DATASET_PATH] [RUN_EVAL](optional) [EVAL_DATASET_PATH](optional) bash run_standalone_train.sh [resnet18|resnet50|resnet101|se-resnet50] [cifar10|imagenet2012] [RANK_TABLE_FILE] [DATASET_PATH] [RUN_EVAL](optional) [EVAL_DATASET_PATH](optional)
# evaluation while distributed training GPU example: # evaluation with distributed training GPU example:
bash run_distribute_train_gpu.sh [resnet50|resnet101] [cifar10|imagenet2012] [DATASET_PATH] [RUN_EVAL](optional) [EVAL_DATASET_PATH](optional) bash run_distribute_train_gpu.sh [resnet50|resnet101] [cifar10|imagenet2012] [DATASET_PATH] [RUN_EVAL](optional) [EVAL_DATASET_PATH](optional)
# evaluation while standalone training GPU example: # evaluation with standalone training GPU example:
bash run_standalone_train_gpu.sh [resnet50|resnet101] [cifar10|imagenet2012] [DATASET_PATH] [RUN_EVAL](optional) [EVAL_DATASET_PATH](optional) bash run_standalone_train_gpu.sh [resnet50|resnet101] [cifar10|imagenet2012] [DATASET_PATH] [RUN_EVAL](optional) [EVAL_DATASET_PATH](optional)
``` ```

View File

@ -34,8 +34,8 @@ def create_dataset1(dataset_path, do_train, repeat_num=1, batch_size=32, target=
batch_size(int): the batch size of dataset. Default: 32 batch_size(int): the batch size of dataset. Default: 32
target(str): the device target. Default: Ascend target(str): the device target. Default: Ascend
distribute(bool): data for distribute or not. Default: False distribute(bool): data for distribute or not. Default: False
enable_cache(bool): whether tensor caching service is used for eval. enable_cache(bool): whether tensor caching service is used for eval. Default: False
cache_session_id(int): If enable_cache, cache session_id need to be provided. cache_session_id(int): If enable_cache, cache session_id need to be provided. Default: None
Returns: Returns:
dataset dataset
@ -104,8 +104,8 @@ def create_dataset2(dataset_path, do_train, repeat_num=1, batch_size=32, target=
batch_size(int): the batch size of dataset. Default: 32 batch_size(int): the batch size of dataset. Default: 32
target(str): the device target. Default: Ascend target(str): the device target. Default: Ascend
distribute(bool): data for distribute or not. Default: False distribute(bool): data for distribute or not. Default: False
enable_cache(bool): whether tensor caching service is used for eval. enable_cache(bool): whether tensor caching service is used for eval. Default: False
cache_session_id(int): If enable_cache, cache session_id need to be provided. cache_session_id(int): If enable_cache, cache session_id need to be provided. Default: None
Returns: Returns:
dataset dataset
@ -182,8 +182,8 @@ def create_dataset3(dataset_path, do_train, repeat_num=1, batch_size=32, target=
batch_size(int): the batch size of dataset. Default: 32 batch_size(int): the batch size of dataset. Default: 32
target(str): the device target. Default: Ascend target(str): the device target. Default: Ascend
distribute(bool): data for distribute or not. Default: False distribute(bool): data for distribute or not. Default: False
enable_cache(bool): whether tensor caching service is used for eval. enable_cache(bool): whether tensor caching service is used for eval. Default: False
cache_session_id(int): If enable_cache, cache session_id need to be provided. cache_session_id(int): If enable_cache, cache session_id need to be provided. Default: None
Returns: Returns:
dataset dataset
@ -259,8 +259,8 @@ def create_dataset4(dataset_path, do_train, repeat_num=1, batch_size=32, target=
batch_size(int): the batch size of dataset. Default: 32 batch_size(int): the batch size of dataset. Default: 32
target(str): the device target. Default: Ascend target(str): the device target. Default: Ascend
distribute(bool): data for distribute or not. Default: False distribute(bool): data for distribute or not. Default: False
enable_cache(bool): whether tensor caching service is used for eval. enable_cache(bool): whether tensor caching service is used for eval. Default: False
cache_session_id(int): If enable_cache, cache session_id need to be provided. cache_session_id(int): If enable_cache, cache session_id need to be provided. Default: None
Returns: Returns:
dataset dataset

View File

@ -16,7 +16,7 @@
# source the globals and functions for use with cache testing # source the globals and functions for use with cache testing
export SKIP_ADMIN_COUNTER=false export SKIP_ADMIN_COUNTER=false
declare failed_tests declare session_id failed_tests
. cachetest_lib.sh . cachetest_lib.sh
echo echo
@ -160,6 +160,18 @@ cmd="${CACHE_ADMIN} -d 99999"
CacheAdminCmd "${cmd}" 1 CacheAdminCmd "${cmd}" 1
HandleRcExit $? 0 0 HandleRcExit $? 0 0
# generate two new sessions to test multi-destroy
GetSession
HandleRcExit $? 0 0
session_id1=$session_id
GetSession
HandleRcExit $? 0 0
session_id2=$session_id
# test multi-session destroy
cmd="${CACHE_ADMIN} -d ${session_id1} ${session_id2}"
CacheAdminCmd "${cmd}" 0
HandleRcExit $? 0 0
# stop cache server at this point # stop cache server at this point
StopServer StopServer
HandleRcExit $? 1 1 HandleRcExit $? 1 1