Detect interrupt and cleanup BatchWait

and other minor change per the request of HQ testers
This commit is contained in:
Lixia Chen 2020-12-10 17:38:45 -05:00
parent d88aa05859
commit b4b73b16c9
4 changed files with 21 additions and 11 deletions

View File

@ -298,6 +298,7 @@ Status CacheAdminArgHandler::Validate() {
// Any unhandled arguments at this point is an error.
if (!trailing_args_.empty()) {
std::string err_msg = "Invalid arguments provided: " + trailing_args_;
err_msg += "\nPlease try `cache_admin --help` for more information";
return Status(StatusCode::kSyntaxError, err_msg);
}
@ -540,8 +541,11 @@ void CacheAdminArgHandler::Help() {
std::cerr << " [[-s | --spilldir] <spilling directory>] Default is " << DefaultSpillDir() << ".\n";
std::cerr << " [[-l | --loglevel] <log level>] Default is 1 (warning level).\n";
std::cerr << " [--destroy_session | -d] <session id>\n";
std::cerr << " [[-p | --port] <port number>]\n";
std::cerr << " [--generate_session | -g]\n";
std::cerr << " [[-p | --port] <port number>]\n";
std::cerr << " [--list_sessions]\n";
std::cerr << " [[-p | --port] <port number>]\n";
std::cerr << " [--help]" << std::endl;
// Do not expose these option to the user via help or documentation, but the options do exist to aid with
// development and tuning.

View File

@ -323,6 +323,7 @@ Status CacheClient::BuildPhaseDone() const {
Status CacheClient::PushRequest(std::shared_ptr<BaseRequest> rq) const { return comm_->HandleRequest(std::move(rq)); }
void CacheClient::ServerRunningOutOfResources() {
MS_LOG(WARNING) << "Cache server runs out of memory or disk space to cache any more rows!\n";
bool expected = true;
if (fetch_all_keys_.compare_exchange_strong(expected, false)) {
Status rc;
@ -509,7 +510,12 @@ Status CacheClient::AsyncBufferStream::SyncFlush(bool blocking) {
} else {
// Some clients are late and aren't done yet. Let go of the lock.
lock.Unlock();
retry = true;
if (this_thread::is_interrupted()) {
retry = false;
flush_rc_ = Status(StatusCode::kInterrupted);
} else {
retry = true;
}
writer_wp_.Set();
std::this_thread::yield();
}

View File

@ -99,7 +99,6 @@ ds::Status StartServer(int argc, char **argv) {
std::cout << "\nRecommendation:\nSince the server is detached into its own daemon process, monitor the server "
"logs (under "
<< ds::DefaultLogDir() << ") for any issues that may happen after startup\n";
MS_LOG(INFO) << "Cache server has started successfully and is listening on port " << port << std::endl;
signal(SIGCHLD, SIG_IGN); // ignore sig child signal.
return ds::Status::OK();
} else {
@ -119,6 +118,7 @@ ds::Status StartServer(int argc, char **argv) {
}
// Dump the summary
MS_LOG(INFO) << "Cache server has started successfully and is listening on port " << port << std::endl;
MS_LOG(WARNING) << "Logging services started with log level: " << argv[5];
MS_LOG(WARNING) << builder << std::endl;
// Create the instance with some sanity checks built in

View File

@ -403,7 +403,7 @@ Status CacheServer::BatchFetch(const std::shared_ptr<flatbuffers::FlatBufferBuil
auto p = flatbuffers::GetRoot<BatchDataLocatorMsg>(fbb->GetBufferPointer());
const auto num_elements = p->rows()->size();
auto connection_id = p->connection_id();
BatchWait batch_wait = BatchWait(num_elements);
auto batch_wait = std::make_unique<BatchWait>(num_elements);
int64_t data_offset = (num_elements + 1) * sizeof(int64_t);
auto *offset_array = reinterpret_cast<int64_t *>(out->GetMutablePointer());
offset_array[0] = data_offset;
@ -439,17 +439,17 @@ Status CacheServer::BatchFetch(const std::shared_ptr<flatbuffers::FlatBufferBuil
auto offset = bld.Finish();
fb2.Finish(offset);
cache_rq->rq_.add_buf_data(fb2.GetBufferPointer(), fb2.GetSize());
cache_rq->rq_.add_buf_data(std::to_string(reinterpret_cast<int64_t>(&batch_wait)));
cache_rq->rq_.add_buf_data(std::to_string(reinterpret_cast<int64_t>(batch_wait.get())));
RETURN_IF_NOT_OK(PushRequest(worker_id, cache_rq));
} else {
// Nothing to fetch but we still need to post something back into the wait area.
RETURN_IF_NOT_OK(batch_wait.Set(Status::OK()));
RETURN_IF_NOT_OK(batch_wait->Set(Status::OK()));
}
}
// Now wait for all of them to come back.
RETURN_IF_NOT_OK(batch_wait.Wait());
RETURN_IF_NOT_OK(batch_wait->Wait());
// Return the result
return batch_wait.GetRc();
return batch_wait->GetRc();
}
Status CacheServer::BatchFetchRows(CacheRequest *rq, CacheReply *reply) {
@ -733,7 +733,7 @@ Status CacheServer::BatchCacheRows(CacheRequest *rq) {
offset_addr = strtoll(rq->buf_data(1).data(), nullptr, 10);
auto p = reinterpret_cast<char *>(reinterpret_cast<int64_t>(base) + offset_addr);
num_elem = strtol(rq->buf_data(2).data(), nullptr, 10);
BatchWait batch_wait = BatchWait(num_elem);
auto batch_wait = std::make_unique<BatchWait>(num_elem);
// Get a set of free request and push into the queues.
for (auto i = 0; i < num_elem; ++i) {
auto start = reinterpret_cast<int64_t>(p);
@ -754,13 +754,13 @@ Status CacheServer::BatchCacheRows(CacheRequest *rq) {
cache_rq->rq_.add_buf_data(cookie);
cache_rq->rq_.add_buf_data(std::to_string(start - reinterpret_cast<int64_t>(base)));
cache_rq->rq_.add_buf_data(std::to_string(reinterpret_cast<int64_t>(p - start)));
cache_rq->rq_.add_buf_data(std::to_string(reinterpret_cast<int64_t>(&batch_wait)));
cache_rq->rq_.add_buf_data(std::to_string(reinterpret_cast<int64_t>(batch_wait.get())));
RETURN_IF_NOT_OK(PushRequest(GetRandomWorker(), cache_rq));
}
// Now wait for all of them to come back.
RETURN_IF_NOT_OK(batch_wait.Wait());
RETURN_IF_NOT_OK(batch_wait->Wait());
// Return the result
return batch_wait.GetRc();
return batch_wait->GetRc();
} catch (const std::exception &e) {
RETURN_STATUS_UNEXPECTED(e.what());
}