!11920 Refactor code under engine/cache/perf to pass lizard check

From: @lixiachen
Reviewed-by: @liucunwei,@pandoublefeng
Signed-off-by: @liucunwei
This commit is contained in:
mindspore-ci-bot 2021-02-01 16:08:34 +08:00 committed by Gitee
commit e258315885
4 changed files with 244 additions and 204 deletions

View File

@ -33,6 +33,10 @@
namespace mindspore {
namespace dataset {
const char CachePerfRun::kCachePipelineBinary[] = "cache_pipeline";
const int32_t port_opt = 1000; // there is no short option for port
const int32_t hostname_opt = 1001; // there is no short option for hostname
const int32_t connect_opt = 1002; // there is no short option for connect
void CachePerfRun::PrintHelp() {
std::cout << "Options:\n"
" -h,--help: Show this usage message\n"
@ -62,156 +66,90 @@ void CachePerfRun::PrintHelp() {
<< " --hostname: Hostname of the cache server. Default = " << kCfgDefaultCacheHost << "\n";
}
int32_t CachePerfRun::ProcessArgs(int argc, char **argv) {
if (argc == 1) {
PrintHelp();
return -1;
}
const int32_t port_opt = 1000; // there is no short option for port
const int32_t hostname_opt = 1001; // there is no short option for hostname
const int32_t connect_opt = 1002; // there is no short option for connect
int shuffle = 0;
int spill = 0;
const char *const short_opts = ":n:e:p:a:s:r:w:";
const option long_opts[] = {{"pipeline", required_argument, nullptr, 'n'},
{"epoch", required_argument, nullptr, 'e'},
{"prefetch_size", required_argument, nullptr, 'p'},
{"shuffle", no_argument, &shuffle, 1},
{"cache_size", required_argument, nullptr, 'a'},
{"num_rows", required_argument, nullptr, 's'},
{"row_size", required_argument, nullptr, 'r'},
{"workers", required_argument, nullptr, 'w'},
{"port", required_argument, nullptr, port_opt},
{"hostname", required_argument, nullptr, hostname_opt},
{"spill", no_argument, &spill, 1},
{"connection", required_argument, nullptr, connect_opt},
{"help", no_argument, nullptr, 'h'},
{nullptr, no_argument, nullptr, 0}};
std::map<int32_t, int32_t> seen_opts;
int32_t CachePerfRun::ProcessArgsHelper(int32_t opt) {
int32_t rc = 0;
try {
while (rc == 0) {
int32_t option_indxex;
const auto opt = getopt_long(argc, argv, short_opts, long_opts, &option_indxex);
if (-1 == opt) {
if (optind < argc) {
rc = -1;
std::cerr << "Unknown arguments: ";
while (optind < argc) {
std::cerr << argv[optind++] << " ";
}
std::cerr << std::endl;
}
switch (opt) {
case 'n': {
num_pipelines_ = std::stoi(optarg);
break;
}
if (opt > 0) {
seen_opts[opt]++;
if (seen_opts[opt] > 1) {
std::string long_name = long_opts[option_indxex].name;
std::cerr << "The " << long_name << " argument was given more than once." << std::endl;
rc = -1;
continue;
}
case 'e': {
num_epoches_ = std::stoi(optarg);
break;
}
switch (opt) {
case 0: {
if (long_opts[option_indxex].flag == &shuffle) {
shuffle_ = true;
} else if (long_opts[option_indxex].flag == &spill) {
cache_builder_.SetSpill(true);
}
break;
}
case 'n': {
num_pipelines_ = std::stoi(optarg);
break;
}
case 'e': {
num_epoches_ = std::stoi(optarg);
break;
}
case 'p': {
int32_t prefetch_sz = std::stoi(optarg);
cache_builder_.SetPrefetchSize(prefetch_sz);
break;
}
case 'a': {
int32_t cache_sz = std::stoi(optarg);
cache_builder_.SetCacheMemSz(cache_sz);
break;
}
case 's': {
num_rows_ = std::stoi(optarg);
break;
}
case 'r': {
row_size_ = std::stoi(optarg);
break;
}
case 'w': {
cfg_.set_num_parallel_workers(std::stoi(optarg));
break;
}
case connect_opt: {
int32_t connection_sz = std::stoi(optarg);
cache_builder_.SetNumConnections(connection_sz);
break;
}
case port_opt: {
int32_t port = std::stoi(optarg);
cache_builder_.SetPort(port);
break;
}
case hostname_opt: {
std::string hostname = optarg;
cache_builder_.SetHostname(hostname);
break;
}
case 'h': // -h or --help
PrintHelp();
rc = -1;
break;
case ':':
std::cerr << "Missing argument for option " << char(optopt) << std::endl;
rc = -1;
break;
case '?': // Unrecognized option
default:
std::cerr << "Unknown option " << char(optopt) << std::endl;
PrintHelp();
rc = -1;
break;
case 'p': {
int32_t prefetch_sz = std::stoi(optarg);
cache_builder_.SetPrefetchSize(prefetch_sz);
break;
}
case 'a': {
int32_t cache_sz = std::stoi(optarg);
cache_builder_.SetCacheMemSz(cache_sz);
break;
}
case 's': {
num_rows_ = std::stoi(optarg);
break;
}
case 'r': {
row_size_ = std::stoi(optarg);
break;
}
case 'w': {
cfg_.set_num_parallel_workers(std::stoi(optarg));
break;
}
case connect_opt: {
int32_t connection_sz = std::stoi(optarg);
cache_builder_.SetNumConnections(connection_sz);
break;
}
case port_opt: {
int32_t port = std::stoi(optarg);
cache_builder_.SetPort(port);
break;
}
case hostname_opt: {
std::string hostname = optarg;
cache_builder_.SetHostname(hostname);
break;
}
case 'h': // -h or --help
PrintHelp();
rc = -1;
break;
case ':':
std::cerr << "Missing argument for option " << char(optopt) << std::endl;
rc = -1;
break;
case '?': // Unrecognized option
default:
std::cerr << "Unknown option " << char(optopt) << std::endl;
PrintHelp();
rc = -1;
break;
}
} catch (const std::exception &e) {
PrintHelp();
rc = -1;
}
return rc;
}
if (rc < 0) {
return rc;
}
int32_t CachePerfRun::SanityCheck(std::map<int32_t, int32_t> seen_opts) {
// We have all the defaults except sample size and average row size which the user must specify.
auto it = seen_opts.find('s');
if (it == seen_opts.end()) {
@ -249,9 +187,84 @@ int32_t CachePerfRun::ProcessArgs(int argc, char **argv) {
std::cerr << "Sample size is smaller than the number of pipelines." << std::endl;
return -1;
}
return 0;
}
int32_t CachePerfRun::ProcessArgs(int argc, char **argv) {
if (argc == 1) {
PrintHelp();
return -1;
}
int shuffle = 0;
int spill = 0;
const char *const short_opts = ":n:e:p:a:s:r:w:";
const option long_opts[] = {{"pipeline", required_argument, nullptr, 'n'},
{"epoch", required_argument, nullptr, 'e'},
{"prefetch_size", required_argument, nullptr, 'p'},
{"shuffle", no_argument, &shuffle, 1},
{"cache_size", required_argument, nullptr, 'a'},
{"num_rows", required_argument, nullptr, 's'},
{"row_size", required_argument, nullptr, 'r'},
{"workers", required_argument, nullptr, 'w'},
{"port", required_argument, nullptr, port_opt},
{"hostname", required_argument, nullptr, hostname_opt},
{"spill", no_argument, &spill, 1},
{"connection", required_argument, nullptr, connect_opt},
{"help", no_argument, nullptr, 'h'},
{nullptr, no_argument, nullptr, 0}};
std::map<int32_t, int32_t> seen_opts;
int32_t rc = 0;
try {
while (rc == 0) {
int32_t option_indxex;
const auto opt = getopt_long(argc, argv, short_opts, long_opts, &option_indxex);
if (opt == -1) {
if (optind < argc) {
rc = -1;
std::cerr << "Unknown arguments: ";
while (optind < argc) {
std::cerr << argv[optind++] << " ";
}
std::cerr << std::endl;
}
break;
}
if (opt > 0) {
seen_opts[opt]++;
if (seen_opts[opt] > 1) {
std::string long_name = long_opts[option_indxex].name;
std::cerr << "The " << long_name << " argument was given more than once." << std::endl;
rc = -1;
continue;
}
}
if (opt == 0) {
if (long_opts[option_indxex].flag == &shuffle) {
shuffle_ = true;
} else if (long_opts[option_indxex].flag == &spill) {
cache_builder_.SetSpill(true);
}
continue;
}
rc = ProcessArgsHelper(opt);
}
} catch (const std::exception &e) {
PrintHelp();
rc = -1;
}
if (rc < 0) return rc;
rc = SanityCheck(seen_opts);
if (rc < 0) return rc;
pid_lists_.reserve(num_pipelines_);
return 0;
}
@ -392,42 +405,7 @@ Status CachePerfRun::ListenToPipeline(int32_t workerId) {
return Status::OK();
}
Status CachePerfRun::Run() {
// Now we bring up TaskManager.
RETURN_IF_NOT_OK(Services::CreateInstance());
// Handle Control-C
RegisterHandlers();
// Get a session from the server.
RETURN_IF_NOT_OK(GetSession());
// Generate a random crc.
auto mt = GetRandomDevice();
std::uniform_int_distribution<session_id_type> distribution(0, std::numeric_limits<int32_t>::max());
crc_ = distribution(mt);
std::cout << "CRC: " << crc_ << std::endl;
// Create all the resources required by the pipelines before we fork.
for (auto i = 0; i < num_pipelines_; ++i) {
// We will use shared message queues for communication between parent (this process)
// and each pipelines.
auto access_mode = S_IRUSR | S_IWUSR;
int32_t msg_send_qid = msgget(IPC_PRIVATE, IPC_CREAT | IPC_EXCL | access_mode);
if (msg_send_qid == -1) {
std::string errMsg = "Unable to create a message queue. Errno = " + std::to_string(errno);
RETURN_STATUS_UNEXPECTED(errMsg);
}
msg_send_lists_.push_back(msg_send_qid);
int32_t msg_recv_qid = msgget(IPC_PRIVATE, IPC_CREAT | IPC_EXCL | access_mode);
if (msg_recv_qid == -1) {
std::string errMsg = "Unable to create a message queue. Errno = " + std::to_string(errno);
RETURN_STATUS_UNEXPECTED(errMsg);
}
msg_recv_lists_.push_back(msg_recv_qid);
}
// Now we create the children knowing all two sets of message queues are constructed.
auto start_tick = std::chrono::steady_clock::now();
Status CachePerfRun::StartPipelines() {
for (auto i = 0; i < num_pipelines_; ++i) {
auto pid = fork();
if (pid == 0) {
@ -492,10 +470,64 @@ Status CachePerfRun::Run() {
RETURN_STATUS_UNEXPECTED(errMsg);
}
}
return Status::OK();
}
Status CachePerfRun::Cleanup() {
// Destroy the cache. We no longer need it around.
RETURN_IF_NOT_OK(cc_->DestroyCache());
// Unreserve the session
CacheClientInfo cinfo;
cinfo.set_session_id(session_);
auto rq = std::make_shared<DropSessionRequest>(cinfo);
RETURN_IF_NOT_OK(cc_->PushRequest(rq));
RETURN_IF_NOT_OK(rq->Wait());
std::cout << "Drop session " << session_ << " successful" << std::endl;
session_ = 0;
return Status::OK();
}
Status CachePerfRun::Run() {
// Now we bring up TaskManager.
RETURN_IF_NOT_OK(Services::CreateInstance());
// Handle Control-C
RegisterHandlers();
// Get a session from the server.
RETURN_IF_NOT_OK(GetSession());
// Generate a random crc.
auto mt = GetRandomDevice();
std::uniform_int_distribution<session_id_type> distribution(0, std::numeric_limits<int32_t>::max());
crc_ = distribution(mt);
std::cout << "CRC: " << crc_ << std::endl;
// Create all the resources required by the pipelines before we fork.
for (auto i = 0; i < num_pipelines_; ++i) {
// We will use shared message queues for communication between parent (this process)
// and each pipelines.
auto access_mode = S_IRUSR | S_IWUSR;
int32_t msg_send_qid = msgget(IPC_PRIVATE, IPC_CREAT | IPC_EXCL | access_mode);
if (msg_send_qid == -1) {
std::string errMsg = "Unable to create a message queue. Errno = " + std::to_string(errno);
RETURN_STATUS_UNEXPECTED(errMsg);
}
msg_send_lists_.push_back(msg_send_qid);
int32_t msg_recv_qid = msgget(IPC_PRIVATE, IPC_CREAT | IPC_EXCL | access_mode);
if (msg_recv_qid == -1) {
std::string errMsg = "Unable to create a message queue. Errno = " + std::to_string(errno);
RETURN_STATUS_UNEXPECTED(errMsg);
}
msg_recv_lists_.push_back(msg_recv_qid);
}
// Now we create the children knowing all two sets of message queues are constructed.
auto start_tick = std::chrono::steady_clock::now();
RETURN_IF_NOT_OK(StartPipelines());
// Spawn a few threads to monitor the communications from the pipeline.
RETURN_IF_NOT_OK(vg_.ServiceStart());
auto f = std::bind(&CachePerfRun::ListenToPipeline, this, std::placeholders::_1);
for (auto i = 0; i < num_pipelines_; ++i) {
RETURN_IF_NOT_OK(vg_.CreateAsyncTask("Queue listener", std::bind(f, i)));
@ -526,14 +558,10 @@ Status CachePerfRun::Run() {
std::cout << "Get statistics for this session:\n";
std::cout << std::setw(12) << "Mem cached" << std::setw(12) << "Disk cached" << std::setw(16) << "Avg cache size"
<< std::setw(10) << "Numa hit" << std::endl;
std::string stat_mem_cached;
std::string stat_disk_cached;
std::string stat_avg_cached;
std::string stat_numa_hit;
stat_mem_cached = (stat.num_mem_cached == 0) ? "n/a" : std::to_string(stat.num_mem_cached);
stat_disk_cached = (stat.num_disk_cached == 0) ? "n/a" : std::to_string(stat.num_disk_cached);
stat_avg_cached = (stat.avg_cache_sz == 0) ? "n/a" : std::to_string(stat.avg_cache_sz);
stat_numa_hit = (stat.num_numa_hit == 0) ? "n/a" : std::to_string(stat.num_numa_hit);
std::string stat_mem_cached = (stat.num_mem_cached == 0) ? "n/a" : std::to_string(stat.num_mem_cached);
std::string stat_disk_cached = (stat.num_disk_cached == 0) ? "n/a" : std::to_string(stat.num_disk_cached);
std::string stat_avg_cached = (stat.avg_cache_sz == 0) ? "n/a" : std::to_string(stat.avg_cache_sz);
std::string stat_numa_hit = (stat.num_numa_hit == 0) ? "n/a" : std::to_string(stat.num_numa_hit);
std::cout << std::setw(12) << stat_mem_cached << std::setw(12) << stat_disk_cached << std::setw(16) << stat_avg_cached
<< std::setw(10) << stat_numa_hit << std::endl;
@ -566,18 +594,8 @@ Status CachePerfRun::Run() {
++epoch_num;
}
// Destroy the cache. We no longer need it around.
RETURN_IF_NOT_OK(cc_->DestroyCache());
// Unreserve the session
CacheClientInfo cinfo;
cinfo.set_session_id(session_);
auto rq = std::make_shared<DropSessionRequest>(cinfo);
RETURN_IF_NOT_OK(cc_->PushRequest(rq));
RETURN_IF_NOT_OK(rq->Wait());
std::cout << "Drop session " << session_ << " successful" << std::endl;
session_ = 0;
// Destroy the cache client and drop the session
RETURN_IF_NOT_OK(Cleanup());
return Status::OK();
}
} // namespace dataset

View File

@ -93,6 +93,10 @@ class CachePerfRun {
Status GetSession();
Status ListenToPipeline(int32_t workerId);
void PrintEpochSummary() const;
Status StartPipelines();
Status Cleanup();
int32_t SanityCheck(std::map<int32_t, int32_t> seen_opts);
int32_t ProcessArgsHelper(int32_t opt);
};
} // namespace dataset
} // namespace mindspore

View File

@ -31,14 +31,9 @@ namespace mindspore {
namespace dataset {
void CachePipelineRun::PrintHelp() { std::cout << "Please run the executable cache_perf instead." << std::endl; }
int32_t CachePipelineRun::ProcessArgs(int argc, char **argv) {
if (argc != 3) {
PrintHelp();
return -1;
}
int32_t CachePipelineRun::ProcessPipelineArgs(char *argv) {
try {
std::stringstream cfg_ss(argv[1]);
std::stringstream cfg_ss(argv);
std::string s;
int32_t numArgs = 0;
while (std::getline(cfg_ss, s, ',')) {
@ -72,8 +67,18 @@ int32_t CachePipelineRun::ProcessArgs(int argc, char **argv) {
std::cerr << "Incomplete arguments. Expect 11. But get " << numArgs << std::endl;
return -1;
}
std::stringstream client_ss(argv[2]);
numArgs = 0;
} catch (const std::exception &e) {
std::cerr << "Parse error: " << e.what() << std::endl;
return -1;
}
return 0;
}
int32_t CachePipelineRun::ProcessClientArgs(char *argv) {
try {
std::stringstream client_ss(argv);
std::string s;
int32_t numArgs = 0;
while (std::getline(client_ss, s, ',')) {
if (numArgs == 0) {
cache_builder_.SetHostname(s);
@ -101,6 +106,17 @@ int32_t CachePipelineRun::ProcessArgs(int argc, char **argv) {
return 0;
}
int32_t CachePipelineRun::ProcessArgs(int argc, char **argv) {
if (argc != 3) {
PrintHelp();
return -1;
}
int32_t rc = ProcessPipelineArgs(argv[1]);
if (rc < 0) return rc;
rc = ProcessClientArgs(argv[2]);
return rc;
}
CachePipelineRun::CachePipelineRun()
: my_pipeline_(-1),
num_pipelines_(kDftNumOfPipelines),
@ -282,7 +298,7 @@ Status CachePipelineRun::WriterWorkerEntry(int32_t worker_id) {
std::shared_ptr<Tensor> element;
RETURN_IF_NOT_OK(Tensor::CreateEmpty(shape, col_desc->type(), &element));
row.setId(id);
// CreateEmpty allocates the memory but in virutal address. Let's commit the memory
// CreateEmpty allocates the memory but in virtual address. Let's commit the memory
// so we can get an accurate timing.
auto it = element->begin<int64_t>();
for (auto i = 0; i < num_elements; ++i, ++it) {

View File

@ -52,6 +52,8 @@ class CachePipelineRun {
~CachePipelineRun();
static void PrintHelp();
int32_t ProcessArgs(int argc, char **argv);
int32_t ProcessPipelineArgs(char *argv);
int32_t ProcessClientArgs(char *argv);
void Print(std::ostream &out) const {
out << "Number of pipelines: " << num_pipelines_ << "\n"