Addressed review comments. Fix the bug where issues on a worker may be wrongly cleared by subsequent GetDBinfo request.

This commit is contained in:
Xin Dong 2020-02-05 10:05:48 -08:00
parent 6325c40336
commit 090c89e90a
10 changed files with 53 additions and 40 deletions

View File

@ -164,6 +164,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"incorrect_cluster_file_contents",
"trace_log_file_write_error",
"trace_log_could_not_create_file",
"trace_log_writer_thread_unresponsive",
"process_error",
"io_error",
"io_timeout",
@ -403,7 +404,8 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
"$enum":[
"incorrect_cluster_file_contents",
"trace_log_file_write_error",
"trace_log_could_not_create_file"
"trace_log_could_not_create_file",
"trace_log_writer_thread_unresponsive"
]
},
"description":"Cluster file contents do not match current cluster connection string. Verify cluster file is writable and has not been overwritten externally."
@ -413,7 +415,7 @@ const KeyRef JSONSchemas::statusSchema = LiteralStringRef(R"statusSchema(
}
],
)statusSchema"
R"statusSchema(
R"statusSchema(
"recovery_state":{
"required_resolvers":1,
"required_proxies":1,

View File

@ -514,7 +514,6 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( DEGRADED_WARNING_LIMIT, 1 );
init( DEGRADED_WARNING_RESET_DELAY, 7*24*60*60 );
init( TRACE_LOG_FLUSH_FAILURE_CHECK_INTERVAL_SECONDS, 10 );
init( TRACE_LOG_FLUSH_FAILURE_REPORT_THRESHOLD, 100 );
init( TRACE_LOG_PING_TIMEOUT_SECONDS, 5.0 );
// Test harness

View File

@ -454,7 +454,6 @@ public:
double DEGRADED_WARNING_LIMIT;
double DEGRADED_WARNING_RESET_DELAY;
int64_t TRACE_LOG_FLUSH_FAILURE_CHECK_INTERVAL_SECONDS;
int64_t TRACE_LOG_FLUSH_FAILURE_REPORT_THRESHOLD;
double TRACE_LOG_PING_TIMEOUT_SECONDS;
// Test harness

View File

@ -498,8 +498,8 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQu
ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
Reference<ClusterConnectionFile> ccf, LocalityData locality,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Optional<Reference<AsyncVar<std::set<std::string>>>> issues =
Optional<Reference<AsyncVar<std::set<std::string>>>>());
Optional<Reference<AsyncVar<std::vector<std::string>>>> issues =
Optional<Reference<AsyncVar<std::vector<std::string>>>>());
ACTOR Future<Void> resolver(ResolverInterface proxy, InitializeResolverRequest initReq,
Reference<AsyncVar<ServerDBInfo>> db);
ACTOR Future<Void> logRouter(TLogInterface interf, InitializeLogRouterRequest req,

View File

@ -747,7 +747,7 @@ ACTOR Future<Void> workerSnapCreate(WorkerSnapRequest snapReq, StringRef snapFol
return Void();
}
ACTOR Future<Void> monitorTraceLogIssues(Optional<Reference<AsyncVar<std::set<std::string>>>> issues) {
ACTOR Future<Void> monitorTraceLogIssues(Optional<Reference<AsyncVar<std::vector<std::string>>>> issues) {
state bool pingTimeout = false;
loop {
wait(delay(SERVER_KNOBS->TRACE_LOG_FLUSH_FAILURE_CHECK_INTERVAL_SECONDS));
@ -767,15 +767,14 @@ ACTOR Future<Void> monitorTraceLogIssues(Optional<Reference<AsyncVar<std::set<st
}
}
if (issues.present()) {
std::set<std::string> _issues = getTraceLogIssues();
std::vector<std::string> _issues;
retriveTraceLogIssues(_issues);
if (pingTimeout) {
// Ping trace log writer thread timeout.
_issues.insert("trace_log_writer_thread_unresponsive");
_issues.push_back("trace_log_writer_thread_unresponsive");
pingTimeout = false;
}
if (_issues.size() > 0) {
issues.get()->set(_issues);
}
issues.get()->set(_issues);
}
}
}
@ -783,23 +782,21 @@ ACTOR Future<Void> monitorTraceLogIssues(Optional<Reference<AsyncVar<std::set<st
ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
Reference<ClusterConnectionFile> connFile, LocalityData locality,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Optional<Reference<AsyncVar<std::set<std::string>>>> issues) {
Optional<Reference<AsyncVar<std::vector<std::string>>>> issues) {
// Initially most of the serverDBInfo is not known, but we know our locality right away
ServerDBInfo localInfo;
localInfo.myLocality = locality;
dbInfo->set(localInfo);
state Optional<double> incorrectTime;
state bool checkIssues = false;
loop {
GetServerDBInfoRequest req;
req.knownServerInfoID = dbInfo->get().id;
if (issues.present() && checkIssues) {
if (issues.present()) {
for (auto const& i : issues.get()->get()) {
req.issues.push_back_deep(req.issues.arena(), i);
}
checkIssues = false;
}
ClusterConnectionString fileConnectionString;
@ -845,7 +842,7 @@ ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterContro
if(ccInterface->get().present())
TraceEvent("GotCCInterfaceChange").detail("CCID", ccInterface->get().get().id()).detail("CCMachine", ccInterface->get().get().getWorkers.getEndpoint().getPrimaryAddress());
}
when(wait(issues.present() ? issues.get()->onChange() : Never())) { checkIssues = true; }
when(wait(issues.present() ? issues.get()->onChange() : Never())) {}
}
}
}
@ -914,7 +911,7 @@ ACTOR Future<Void> workerServer(
state WorkerInterface interf( locality );
interf.initEndpoints();
state Reference<AsyncVar<std::set<std::string>>> issues(new AsyncVar<std::set<std::string>>());
state Reference<AsyncVar<std::vector<std::string>>> issues(new AsyncVar<std::vector<std::string>>());
folder = abspath(folder);

View File

@ -82,7 +82,7 @@ void FileTraceLogWriter::write(const std::string& str) {
remaining -= ret;
ptr += ret;
} else {
issues->addIssue("trace_log_file_write_error");
issues->addAndExpire("trace_log_file_write_error");
fprintf(stderr, "Unexpected error [%d] when flushing trace log.\n", errno);
lastError(errno);
threadSleep(0.1);
@ -116,7 +116,7 @@ void FileTraceLogWriter::open() {
}
else {
fprintf(stderr, "ERROR: could not create trace log file `%s' (%d: %s)\n", finalname.c_str(), errno, strerror(errno));
issues->addIssue("trace_log_could_not_create_file");
issues->addAndExpire("trace_log_could_not_create_file");
int errorNum = errno;
onMainThreadVoid([finalname, errorNum]{

View File

@ -161,6 +161,7 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
init( TRACE_EVENT_THROTTLER_MSG_LIMIT, 20000 );
init( MAX_TRACE_FIELD_LENGTH, 495 ); // If the value of this is changed, the corresponding default in Trace.cpp should be changed as well
init( MAX_TRACE_EVENT_LENGTH, 4000 ); // If the value of this is changed, the corresponding default in Trace.cpp should be changed as well
init( TRACE_LOG_ISSUE_EXPIRATION_INTERVAL, 5.0);
//TDMetrics
init( MAX_METRICS, 600 );

View File

@ -182,6 +182,7 @@ public:
int TRACE_EVENT_THROTTLER_MSG_LIMIT;
int MAX_TRACE_FIELD_LENGTH;
int MAX_TRACE_EVENT_LENGTH;
double TRACE_LOG_ISSUE_EXPIRATION_INTERVAL;
//TDMetrics
int64_t MAX_METRIC_SIZE;

View File

@ -29,6 +29,7 @@
#include <stdarg.h>
#include <cctype>
#include <time.h>
#include <set>
#include "flow/IThreadPool.h"
#include "flow/ThreadHelper.actor.h"
@ -222,19 +223,31 @@ public:
struct IssuesList : ITraceLogIssuesReporter, ThreadSafeReferenceCounted<IssuesList> {
IssuesList() : moved(false){};
void addIssue(std::string issue) override {
void addAndExpire(std::string issue, double expirationInterval) override {
MutexHolder h(mutex);
if (moved) {
issues = std::set<std::string>();
moved = false;
auto now = ::now();
if (issues.find(issue) != issues.end()) {
issues[issue]++;
} else {
issues[issue] = 1;
}
issues.insert(issue);
queue.emplace_back(now + expirationInterval, issue);
}
std::set<std::string> getAndFlushIssues() override {
void retrieveIssues(std::vector<std::string>& out) override {
MutexHolder h(mutex);
moved = true;
return std::move(issues);
// clean up any expired events first
auto now = ::now();
while (queue.size() > 0 && queue.front().first <= now) {
ASSERT(issues.find(queue.front().second) != issues.end());
if (--issues[queue.front().second] == 0) {
issues.erase(queue.front().second);
}
queue.pop_front();
}
for (auto const& i : issues) {
out.push_back(i.first);
}
}
void addref() { ThreadSafeReferenceCounted<IssuesList>::addref(); }
@ -243,7 +256,8 @@ public:
private:
Mutex mutex;
bool moved;
std::set<std::string> issues;
std::unordered_map<std::string, int64_t> issues;
Deque<std::pair<double, std::string>> queue;
};
Reference<IssuesList> issues;
@ -538,7 +552,7 @@ public:
writer->post(a);
}
std::set<std::string> getTraceLogIssues() { return issues->getAndFlushIssues(); }
void retriveTraceLogIssues(std::vector<std::string>& out) { return issues->retrieveIssues(out); }
~TraceLog() {
close();
@ -774,13 +788,9 @@ TraceEvent& TraceEvent::operator=(TraceEvent &&ev) {
return *this;
}
uint64_t getUnsuccessfulFlushCount() {
return g_traceLog.getUnsuccessfulFlushCount();
}
std::set<std::string> getTraceLogIssues() {
return std::move(g_traceLog.getTraceLogIssues());
return g_traceLog.getTraceLogIssues();
void retriveTraceLogIssues(std::vector<std::string>& out) {
return g_traceLog.retriveTraceLogIssues(out);
}
void pingTraceLogWriterThread(ThreadFuture<Void>& p) {

View File

@ -27,7 +27,6 @@
#include <stdint.h>
#include <string>
#include <map>
#include <set>
#include <type_traits>
#include "flow/IRandom.h"
#include "flow/Error.h"
@ -530,8 +529,13 @@ struct ITraceLogFormatter {
};
struct ITraceLogIssuesReporter {
virtual void addIssue(std::string issue) = 0;
virtual std::set<std::string> getAndFlushIssues() = 0;
// The issue will expire after (now + expirationInterval) seconds
virtual void addAndExpire(std::string issue,
double expirationInterval = FLOW_KNOBS->TRACE_LOG_ISSUE_EXPIRATION_INTERVAL) = 0;
// When called, this function will first clean up expired issues.
// If it's never called somehow and the trace log thread is struggling, the memory usage may build up.
virtual void retrieveIssues(std::vector<std::string>& out) = 0;
virtual void addref() = 0;
virtual void delref() = 0;
@ -595,7 +599,7 @@ bool validateTraceClockSource(std::string source);
void addTraceRole(std::string role);
void removeTraceRole(std::string role);
std::set<std::string> getTraceLogIssues();
void retriveTraceLogIssues(std::vector<std::string>& out);
template <class T>
struct ThreadFuture;
void pingTraceLogWriterThread(ThreadFuture<struct Void>& p);