Resolve review comments. Changed how issues got cleared

This commit is contained in:
Xin Dong 2020-02-25 15:34:08 -08:00
parent 3f24ae93f2
commit f20619c9fb
5 changed files with 36 additions and 37 deletions

View File

@ -498,8 +498,8 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQu
ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
Reference<ClusterConnectionFile> ccf, LocalityData locality,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Optional<Reference<AsyncVar<std::vector<std::string>>>> issues =
Optional<Reference<AsyncVar<std::vector<std::string>>>>());
Optional<Reference<AsyncVar<std::set<std::string>>>> issues =
Optional<Reference<AsyncVar<std::set<std::string>>>>());
ACTOR Future<Void> resolver(ResolverInterface proxy, InitializeResolverRequest initReq,
Reference<AsyncVar<ServerDBInfo>> db);
ACTOR Future<Void> logRouter(TLogInterface interf, InitializeLogRouterRequest req,

View File

@ -747,7 +747,7 @@ ACTOR Future<Void> workerSnapCreate(WorkerSnapRequest snapReq, StringRef snapFol
return Void();
}
ACTOR Future<Void> monitorTraceLogIssues(Optional<Reference<AsyncVar<std::vector<std::string>>>> issues) {
ACTOR Future<Void> monitorTraceLogIssues(Optional<Reference<AsyncVar<std::set<std::string>>>> issues) {
state bool pingTimeout = false;
loop {
wait(delay(SERVER_KNOBS->TRACE_LOG_FLUSH_FAILURE_CHECK_INTERVAL_SECONDS));
@ -767,11 +767,11 @@ ACTOR Future<Void> monitorTraceLogIssues(Optional<Reference<AsyncVar<std::vector
}
}
if (issues.present()) {
std::vector<std::string> _issues;
std::set<std::string> _issues;
retriveTraceLogIssues(_issues);
if (pingTimeout) {
// Ping trace log writer thread timeout.
_issues.push_back("trace_log_writer_thread_unresponsive");
_issues.insert("trace_log_writer_thread_unresponsive");
pingTimeout = false;
}
issues.get()->set(_issues);
@ -785,7 +785,7 @@ ACTOR Future<Void> monitorTraceLogIssues(Optional<Reference<AsyncVar<std::vector
ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
Reference<ClusterConnectionFile> connFile, LocalityData locality,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Optional<Reference<AsyncVar<std::vector<std::string>>>> issues) {
Optional<Reference<AsyncVar<std::set<std::string>>>> issues) {
// Initially most of the serverDBInfo is not known, but we know our locality right away
ServerDBInfo localInfo;
localInfo.myLocality = locality;
@ -914,7 +914,7 @@ ACTOR Future<Void> workerServer(
state WorkerInterface interf( locality );
interf.initEndpoints();
state Reference<AsyncVar<std::vector<std::string>>> issues(new AsyncVar<std::vector<std::string>>());
state Reference<AsyncVar<std::set<std::string>>> issues(new AsyncVar<std::set<std::string>>());
folder = abspath(folder);

View File

@ -74,6 +74,7 @@ void FileTraceLogWriter::lastError(int err) {
void FileTraceLogWriter::write(const std::string& str) {
auto ptr = str.c_str();
int remaining = str.size();
bool needsResolve = false;
while ( remaining ) {
int ret = __write( traceFileFD, ptr, remaining );
@ -81,8 +82,13 @@ void FileTraceLogWriter::write(const std::string& str) {
lastError(0);
remaining -= ret;
ptr += ret;
if (needsResolve) {
issues->resolveIssue("trace_log_file_write_error");
needsResolve = false;
}
} else {
issues->addAndExpire("trace_log_file_write_error");
issues->addIssue("trace_log_file_write_error");
needsResolve = true;
fprintf(stderr, "Unexpected error [%d] when flushing trace log.\n", errno);
lastError(errno);
threadSleep(0.1);
@ -92,6 +98,7 @@ void FileTraceLogWriter::write(const std::string& str) {
void FileTraceLogWriter::open() {
cleanupTraceFiles();
bool needsResolve = false;
++index;
@ -116,7 +123,8 @@ void FileTraceLogWriter::open() {
}
else {
fprintf(stderr, "ERROR: could not create trace log file `%s' (%d: %s)\n", finalname.c_str(), errno, strerror(errno));
issues->addAndExpire("trace_log_could_not_create_file");
issues->addIssue("trace_log_could_not_create_file");
needsResolve = true;
int errorNum = errno;
onMainThreadVoid([finalname, errorNum]{
@ -129,6 +137,9 @@ void FileTraceLogWriter::open() {
}
}
onMainThreadVoid([]{ latestEventCache.clear("TraceFileOpenError"); }, NULL);
if (needsResolve) {
issues->resolveIssue("trace_log_could_not_create_file");
}
lastError(0);
}

View File

@ -223,30 +223,19 @@ public:
struct IssuesList : ITraceLogIssuesReporter, ThreadSafeReferenceCounted<IssuesList> {
IssuesList(){};
void addAndExpire(std::string issue, double expirationInterval) override {
void addIssue(std::string issue) override { issues.insert(issue); }
void retrieveIssues(std::set<std::string>& out) override {
MutexHolder h(mutex);
auto now = ::now();
if (issues.find(issue) != issues.end()) {
issues[issue]++;
} else {
issues[issue] = 1;
for (auto const& i : issues) {
out.insert(i);
}
queue.emplace_back(now + expirationInterval, issue);
}
void retrieveIssues(std::vector<std::string>& out) override {
void resolveIssue(std::string issue) override {
MutexHolder h(mutex);
// clean up any expired events first
auto now = ::now();
while (queue.size() > 0 && queue.front().first <= now) {
ASSERT(issues.find(queue.front().second) != issues.end());
if (--issues[queue.front().second] == 0) {
issues.erase(queue.front().second);
}
queue.pop_front();
}
for (auto const& i : issues) {
out.push_back(i.first);
if (issues.find(issue) != issues.end()) {
issues.erase(issue);
}
}
@ -255,8 +244,7 @@ public:
private:
Mutex mutex;
std::unordered_map<std::string, int64_t> issues;
Deque<std::pair<double, std::string>> queue;
std::set<std::string> issues;
};
Reference<IssuesList> issues;
@ -551,7 +539,7 @@ public:
writer->post(a);
}
void retriveTraceLogIssues(std::vector<std::string>& out) { return issues->retrieveIssues(out); }
void retriveTraceLogIssues(std::set<std::string>& out) { return issues->retrieveIssues(out); }
~TraceLog() {
close();
@ -744,6 +732,7 @@ void removeTraceRole(std::string role) {
g_traceLog.removeRole(role);
}
<<<<<<< HEAD
TraceEvent::TraceEvent() : initialized(true), enabled(false), logged(true) {}
TraceEvent::TraceEvent(TraceEvent &&ev) {
@ -788,7 +777,7 @@ TraceEvent& TraceEvent::operator=(TraceEvent &&ev) {
return *this;
}
void retriveTraceLogIssues(std::vector<std::string>& out) {
void retriveTraceLogIssues(std::set<std::string>& out) {
return g_traceLog.retriveTraceLogIssues(out);
}

View File

@ -529,13 +529,12 @@ struct ITraceLogFormatter {
};
struct ITraceLogIssuesReporter {
// The issue will expire after (now + expirationInterval) seconds
virtual void addAndExpire(std::string issue,
double expirationInterval = FLOW_KNOBS->TRACE_LOG_ISSUE_EXPIRATION_INTERVAL) = 0;
virtual void addIssue(std::string issue) = 0;
virtual void resolveIssue(std::string issue) = 0;
// When called, this function will first clean up expired issues.
// If it's never called somehow and the trace log thread is struggling, the memory usage may build up.
virtual void retrieveIssues(std::vector<std::string>& out) = 0;
virtual void retrieveIssues(std::set<std::string>& out) = 0;
virtual void addref() = 0;
virtual void delref() = 0;
@ -599,7 +598,7 @@ bool validateTraceClockSource(std::string source);
void addTraceRole(std::string role);
void removeTraceRole(std::string role);
void retriveTraceLogIssues(std::vector<std::string>& out);
void retriveTraceLogIssues(std::set<std::string>& out);
template <class T>
struct ThreadFuture;
void pingTraceLogWriterThread(ThreadFuture<struct Void>& p);