Add new command for displaying server configurations

This commit is contained in:
tinazhang 2021-01-28 15:37:56 -05:00
parent be6d310946
commit a559fc7f5f
8 changed files with 104 additions and 10 deletions

View File

@ -73,6 +73,7 @@ CacheAdminArgHandler::CacheAdminArgHandler()
arg_map_["-r"] = ArgValue::kArgMemoryCapRatio;
arg_map_["--memory_cap_ratio"] = ArgValue::kArgMemoryCapRatio;
arg_map_["--list_sessions"] = ArgValue::kArgListSessions;
arg_map_["--server_info"] = ArgValue::kArgServerInfo;
// Initialize argument tracker with false values
for (int16_t i = 0; i < static_cast<int16_t>(ArgValue::kArgNumArgs); ++i) {
ArgValue currAV = static_cast<ArgValue>(i);
@ -280,6 +281,10 @@ Status CacheAdminArgHandler::ParseArgStream(std::stringstream *arg_stream) {
RETURN_IF_NOT_OK(AssignArg(tok, static_cast<std::string *>(nullptr), arg_stream, CommandId::kCmdListSessions));
break;
}
case ArgValue::kArgServerInfo: {
RETURN_IF_NOT_OK(AssignArg(tok, static_cast<std::string *>(nullptr), arg_stream, CommandId::kCmdServerInfo));
break;
}
default: {
// Save space delimited trailing arguments
trailing_args_ += (" " + tok);
@ -396,6 +401,10 @@ Status CacheAdminArgHandler::RunCommand() {
}
break;
}
case CommandId::kCmdServerInfo: {
RETURN_IF_NOT_OK(ShowServerInfo());
break;
}
default: {
RETURN_STATUS_UNEXPECTED("Invalid cache admin command id.");
break;
@ -405,6 +414,49 @@ Status CacheAdminArgHandler::RunCommand() {
return Status::OK();
}
Status CacheAdminArgHandler::ShowServerInfo() {
CacheClientGreeter comm(hostname_, port_, 1);
RETURN_IF_NOT_OK(comm.ServiceStart());
auto rq = std::make_shared<ListSessionsRequest>();
RETURN_IF_NOT_OK(comm.HandleRequest(rq));
RETURN_IF_NOT_OK(rq->Wait());
auto session_ids = rq->GetSessionIds();
auto server_cfg_info = rq->GetServerStat();
int32_t num_workers = server_cfg_info.num_workers;
int8_t log_level = server_cfg_info.log_level;
std::string spill_dir = server_cfg_info.spill_dir;
int name_w = 20;
int value_w = 15;
std::cout << "Cache Server Configuration: " << std::endl;
std::cout << "----------------------------------------" << std::endl;
std::cout << std::setw(name_w) << "config name" << std::setw(value_w) << "value" << std::endl;
std::cout << "----------------------------------------" << std::endl;
std::cout << std::setw(name_w) << "hostname" << std::setw(value_w) << hostname_ << std::endl;
std::cout << std::setw(name_w) << "port" << std::setw(value_w) << port_ << std::endl;
std::cout << std::setw(name_w) << "number of workers" << std::setw(value_w) << std::to_string(num_workers)
<< std::endl;
std::cout << std::setw(name_w) << "log level" << std::setw(value_w) << std::to_string(log_level) << std::endl;
if (spill_dir.empty()) {
std::cout << std::setw(name_w) << "spill" << std::setw(value_w) << "disabled" << std::endl;
} else {
std::cout << std::setw(name_w) << "spill dir" << std::setw(value_w) << spill_dir << std::endl;
}
std::cout << "----------------------------------------" << std::endl;
std::cout << "Active sessions: " << std::endl;
if (!session_ids.empty()) {
for (auto session_id : session_ids) {
std::cout << session_id << " ";
}
std::cout << std::endl << "(Please use 'cache_admin --list_session' to get detailed info of sessions.)\n";
} else {
std::cout << "No active sessions." << std::endl;
}
return Status::OK();
}
Status CacheAdminArgHandler::StopServer(CommandId command_id) {
CacheClientGreeter comm(hostname_, port_, 1);
RETURN_IF_NOT_OK(comm.ServiceStart());
@ -550,6 +602,8 @@ void CacheAdminArgHandler::Help() {
std::cerr << " [[-p | --port] <port number>]\n";
std::cerr << " [--list_sessions]\n";
std::cerr << " [[-p | --port] <port number>]\n";
std::cerr << " [--server_info]\n";
std::cerr << " [[-p | --port] <port number>]\n";
std::cerr << " [--help]" << std::endl;
// Do not expose these option to the user via help or documentation, but the options do exist to aid with
// development and tuning.

View File

@ -44,6 +44,7 @@ class CacheAdminArgHandler {
kCmdGenerateSession = 3,
kCmdDestroySession = 4,
kCmdListSessions = 5,
kCmdServerInfo = 6,
kCmdUnknown = 32767
};
@ -74,13 +75,16 @@ class CacheAdminArgHandler {
kArgLogLevel = 11,
kArgMemoryCapRatio = 12,
kArgListSessions = 13,
kArgNumArgs = 14 // Must be the last position to provide a count
kArgServerInfo = 14,
kArgNumArgs = 15 // Must be the last position to provide a count
};
Status StartServer(CommandId command_id);
Status StopServer(CommandId command_id);
Status ShowServerInfo();
Status AssignArg(std::string option, int32_t *out_arg, std::stringstream *arg_stream,
CommandId command_id = CommandId::kCmdUnknown);

View File

@ -42,6 +42,7 @@ ds::Status StartServer(int argc, char **argv) {
.SetNumWorkers(strtol(argv[2], nullptr, 10))
.SetPort(port)
.SetSharedMemorySizeInGB(strtol(argv[4], nullptr, 10))
.SetLogLevel(strtol(argv[5], nullptr, 10))
.SetMemoryCapRatio(strtof(argv[7], nullptr));
auto daemonize_string = argv[6];
@ -119,7 +120,6 @@ ds::Status StartServer(int argc, char **argv) {
// Dump the summary
MS_LOG(WARNING) << "Cache server has started successfully and is listening on port " << port << std::endl;
MS_LOG(WARNING) << "Logging services started with log level: " << argv[5];
MS_LOG(WARNING) << builder << std::endl;
// Create the instance with some sanity checks built in
rc = builder.Build();

View File

@ -333,7 +333,9 @@ Status ListSessionsRequest::PostReply() {
current_info.stats = stats; // fixed length struct. = operator is safe
session_info_list_.push_back(current_info);
}
server_cfg_.num_workers = msg->num_workers();
server_cfg_.log_level = msg->log_level();
server_cfg_.spill_dir = msg->spill_dir()->str();
return Status::OK();
}

View File

@ -47,6 +47,12 @@ struct CacheServiceStat {
int8_t cache_service_state;
};
struct CacheServerCfgInfo {
int32_t num_workers;
int8_t log_level;
std::string spill_dir;
};
/// \brief Info structure ListSessionsRequest
struct SessionCacheInfo {
session_id_type session_id;
@ -410,8 +416,19 @@ class ListSessionsRequest : public BaseRequest {
std::vector<SessionCacheInfo> GetSessionCacheInfo() { return session_info_list_; }
std::vector<session_id_type> GetSessionIds() {
std::vector<session_id_type> session_ids;
for (auto session_info : session_info_list_) {
session_ids.push_back(session_info.session_id);
}
return session_ids;
}
CacheServerCfgInfo GetServerStat() { return server_cfg_; }
private:
std::vector<SessionCacheInfo> session_info_list_;
CacheServerCfgInfo server_cfg_{};
};
class AllocateSharedBlockRequest : public BaseRequest {

View File

@ -726,9 +726,14 @@ Status CacheServer::ListSessions(CacheReply *reply) {
session_msgs_vector.push_back(current_session_info);
}
}
flatbuffers::Offset<flatbuffers::String> spill_dir;
spill_dir = fbb.CreateString(top_);
auto session_msgs = fbb.CreateVector(session_msgs_vector);
ListSessionsMsgBuilder s_builder(fbb);
s_builder.add_sessions(session_msgs);
s_builder.add_num_workers(num_workers_);
s_builder.add_log_level(log_level_);
s_builder.add_spill_dir(spill_dir);
auto offset = s_builder.Finish();
fbb.Finish(offset);
reply->set_result(fbb.GetBufferPointer(), fbb.GetSize());
@ -988,7 +993,7 @@ session_id_type CacheServer::GetSessionID(connection_id_type connection_id) cons
}
CacheServer::CacheServer(const std::string &spill_path, int32_t num_workers, int32_t port,
int32_t shared_meory_sz_in_gb, float memory_cap_ratio)
int32_t shared_meory_sz_in_gb, float memory_cap_ratio, int8_t log_level)
: top_(spill_path),
num_workers_(num_workers),
num_grpc_workers_(num_workers_),
@ -996,7 +1001,8 @@ CacheServer::CacheServer(const std::string &spill_path, int32_t num_workers, int
shared_memory_sz_in_gb_(shared_meory_sz_in_gb),
global_shutdown_(false),
memory_cap_ratio_(memory_cap_ratio),
numa_affinity_(true) {
numa_affinity_(true),
log_level_(log_level) {
hw_info_ = std::make_shared<CacheServerHW>();
// If we are not linked with numa library (i.e. NUMA_ENABLED is false), turn off cpu
// affinity which can make performance worse.

View File

@ -70,6 +70,7 @@ class CacheServer : public Service {
int32_t GetPort() const { return port_; }
int32_t GetSharedMemorySzInGb() const { return shared_memory_sz_in_gb_; }
float GetMemoryCapRatio() const { return memory_cap_ratio_; }
int8_t GetLogLevel() const { return log_level_; }
Builder &SetRootDirectory(std::string root) {
top_ = std::move(root);
@ -91,6 +92,10 @@ class CacheServer : public Service {
memory_cap_ratio_ = ratio;
return *this;
}
Builder &SetLogLevel(int8_t log_level) {
log_level_ = log_level;
return *this;
}
Status SanityCheck();
@ -100,7 +105,8 @@ class CacheServer : public Service {
<< "Number of parallel workers: " << GetNumWorkers() << "\n"
<< "Tcp/ip port: " << GetPort() << "\n"
<< "Shared memory size (in GB): " << GetSharedMemorySzInGb() << "\n"
<< "Memory cap ratio: " << GetMemoryCapRatio();
<< "Memory cap ratio: " << GetMemoryCapRatio() << "\n"
<< "Log level: " << GetLogLevel();
}
friend std::ostream &operator<<(std::ostream &out, const Builder &bld) {
@ -113,7 +119,7 @@ class CacheServer : public Service {
// We need to bring up the Task Manager by bringing up the Services singleton.
RETURN_IF_NOT_OK(Services::CreateInstance());
RETURN_IF_NOT_OK(
CacheServer::CreateInstance(top_, num_workers_, port_, shared_memory_sz_in_gb_, memory_cap_ratio_));
CacheServer::CreateInstance(top_, num_workers_, port_, shared_memory_sz_in_gb_, memory_cap_ratio_, log_level_));
return Status::OK();
}
@ -123,6 +129,7 @@ class CacheServer : public Service {
int32_t port_;
int32_t shared_memory_sz_in_gb_;
float memory_cap_ratio_;
int8_t log_level_;
/// \brief Sanity checks on the shared memory.
/// \return Status object
@ -138,11 +145,11 @@ class CacheServer : public Service {
~CacheServer() override { (void)ServiceStop(); }
static Status CreateInstance(const std::string &spill_path, int32_t num_workers, int32_t port,
int32_t shared_memory_sz, float memory_cap_ratio) {
int32_t shared_memory_sz, float memory_cap_ratio, int8_t log_level) {
std::call_once(init_instance_flag_, [&]() -> Status {
auto &SvcManager = Services::GetInstance();
RETURN_IF_NOT_OK(
SvcManager.AddHook(&instance_, spill_path, num_workers, port, shared_memory_sz, memory_cap_ratio));
SvcManager.AddHook(&instance_, spill_path, num_workers, port, shared_memory_sz, memory_cap_ratio, log_level));
return Status::OK();
});
return Status::OK();
@ -254,6 +261,7 @@ class CacheServer : public Service {
int32_t num_grpc_workers_;
int32_t port_;
int32_t shared_memory_sz_in_gb_;
int8_t log_level_; // log_level is saved here for informational purpose only. It's not a functional field.
std::atomic<bool> global_shutdown_;
float memory_cap_ratio_;
std::shared_ptr<CacheServerHW> hw_info_;
@ -266,7 +274,7 @@ class CacheServer : public Service {
/// \param spill_path Top directory for spilling buffers to.
/// \param num_workers Number of threads for handling requests.
explicit CacheServer(const std::string &spill_path, int32_t num_workers, int32_t port, int32_t share_memory_sz_in_gb,
float memory_cap_ratio);
float memory_cap_ratio, int8_t log_level);
/// \brief Locate a cache service from connection id.
/// \return Pointer to cache service. Null if not found

View File

@ -104,6 +104,9 @@ table ListSessionMsg {
table ListSessionsMsg {
sessions:[ListSessionMsg];
num_workers:int32;
log_level:int8;
spill_dir:string;
}
table DataLocatorMsg {