From f20619c9fbfdb46972bcf259fcd86245235f839d Mon Sep 17 00:00:00 2001 From: Xin Dong Date: Tue, 25 Feb 2020 15:34:08 -0800 Subject: [PATCH] Resolve review comments. Changed how issues got cleared --- fdbserver/WorkerInterface.actor.h | 4 ++-- fdbserver/worker.actor.cpp | 10 ++++----- flow/FileTraceLogWriter.cpp | 15 +++++++++++-- flow/Trace.cpp | 35 +++++++++++-------------------- flow/Trace.h | 9 ++++---- 5 files changed, 36 insertions(+), 37 deletions(-) diff --git a/fdbserver/WorkerInterface.actor.h b/fdbserver/WorkerInterface.actor.h index b63310e59e..c09cc5899a 100644 --- a/fdbserver/WorkerInterface.actor.h +++ b/fdbserver/WorkerInterface.actor.h @@ -498,8 +498,8 @@ ACTOR Future tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQu ACTOR Future monitorServerDBInfo(Reference>> ccInterface, Reference ccf, LocalityData locality, Reference> dbInfo, - Optional>>> issues = - Optional>>>()); + Optional>>> issues = + Optional>>>()); ACTOR Future resolver(ResolverInterface proxy, InitializeResolverRequest initReq, Reference> db); ACTOR Future logRouter(TLogInterface interf, InitializeLogRouterRequest req, diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index 298fa3655d..6460959ac9 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -747,7 +747,7 @@ ACTOR Future workerSnapCreate(WorkerSnapRequest snapReq, StringRef snapFol return Void(); } -ACTOR Future monitorTraceLogIssues(Optional>>> issues) { +ACTOR Future monitorTraceLogIssues(Optional>>> issues) { state bool pingTimeout = false; loop { wait(delay(SERVER_KNOBS->TRACE_LOG_FLUSH_FAILURE_CHECK_INTERVAL_SECONDS)); @@ -767,11 +767,11 @@ ACTOR Future monitorTraceLogIssues(Optional _issues; + std::set _issues; retriveTraceLogIssues(_issues); if (pingTimeout) { // Ping trace log writer thread timeout. - _issues.push_back("trace_log_writer_thread_unresponsive"); + _issues.insert("trace_log_writer_thread_unresponsive"); pingTimeout = false; } issues.get()->set(_issues); @@ -785,7 +785,7 @@ ACTOR Future monitorTraceLogIssues(Optional monitorServerDBInfo(Reference>> ccInterface, Reference connFile, LocalityData locality, Reference> dbInfo, - Optional>>> issues) { + Optional>>> issues) { // Initially most of the serverDBInfo is not known, but we know our locality right away ServerDBInfo localInfo; localInfo.myLocality = locality; @@ -914,7 +914,7 @@ ACTOR Future workerServer( state WorkerInterface interf( locality ); interf.initEndpoints(); - state Reference>> issues(new AsyncVar>()); + state Reference>> issues(new AsyncVar>()); folder = abspath(folder); diff --git a/flow/FileTraceLogWriter.cpp b/flow/FileTraceLogWriter.cpp index 937eb91735..3e4d0bdcd4 100644 --- a/flow/FileTraceLogWriter.cpp +++ b/flow/FileTraceLogWriter.cpp @@ -74,6 +74,7 @@ void FileTraceLogWriter::lastError(int err) { void FileTraceLogWriter::write(const std::string& str) { auto ptr = str.c_str(); int remaining = str.size(); + bool needsResolve = false; while ( remaining ) { int ret = __write( traceFileFD, ptr, remaining ); @@ -81,8 +82,13 @@ void FileTraceLogWriter::write(const std::string& str) { lastError(0); remaining -= ret; ptr += ret; + if (needsResolve) { + issues->resolveIssue("trace_log_file_write_error"); + needsResolve = false; + } } else { - issues->addAndExpire("trace_log_file_write_error"); + issues->addIssue("trace_log_file_write_error"); + needsResolve = true; fprintf(stderr, "Unexpected error [%d] when flushing trace log.\n", errno); lastError(errno); threadSleep(0.1); @@ -92,6 +98,7 @@ void FileTraceLogWriter::write(const std::string& str) { void FileTraceLogWriter::open() { cleanupTraceFiles(); + bool needsResolve = false; ++index; @@ -116,7 +123,8 @@ void FileTraceLogWriter::open() { } else { fprintf(stderr, "ERROR: could not create trace log file `%s' (%d: %s)\n", finalname.c_str(), errno, strerror(errno)); - issues->addAndExpire("trace_log_could_not_create_file"); + issues->addIssue("trace_log_could_not_create_file"); + needsResolve = true; int errorNum = errno; onMainThreadVoid([finalname, errorNum]{ @@ -129,6 +137,9 @@ void FileTraceLogWriter::open() { } } onMainThreadVoid([]{ latestEventCache.clear("TraceFileOpenError"); }, NULL); + if (needsResolve) { + issues->resolveIssue("trace_log_could_not_create_file"); + } lastError(0); } diff --git a/flow/Trace.cpp b/flow/Trace.cpp index e17202e53a..fdeab2e2a6 100644 --- a/flow/Trace.cpp +++ b/flow/Trace.cpp @@ -223,30 +223,19 @@ public: struct IssuesList : ITraceLogIssuesReporter, ThreadSafeReferenceCounted { IssuesList(){}; - void addAndExpire(std::string issue, double expirationInterval) override { + void addIssue(std::string issue) override { issues.insert(issue); } + + void retrieveIssues(std::set& out) override { MutexHolder h(mutex); - auto now = ::now(); - if (issues.find(issue) != issues.end()) { - issues[issue]++; - } else { - issues[issue] = 1; + for (auto const& i : issues) { + out.insert(i); } - queue.emplace_back(now + expirationInterval, issue); } - void retrieveIssues(std::vector& out) override { + void resolveIssue(std::string issue) override { MutexHolder h(mutex); - // clean up any expired events first - auto now = ::now(); - while (queue.size() > 0 && queue.front().first <= now) { - ASSERT(issues.find(queue.front().second) != issues.end()); - if (--issues[queue.front().second] == 0) { - issues.erase(queue.front().second); - } - queue.pop_front(); - } - for (auto const& i : issues) { - out.push_back(i.first); + if (issues.find(issue) != issues.end()) { + issues.erase(issue); } } @@ -255,8 +244,7 @@ public: private: Mutex mutex; - std::unordered_map issues; - Deque> queue; + std::set issues; }; Reference issues; @@ -551,7 +539,7 @@ public: writer->post(a); } - void retriveTraceLogIssues(std::vector& out) { return issues->retrieveIssues(out); } + void retriveTraceLogIssues(std::set& out) { return issues->retrieveIssues(out); } ~TraceLog() { close(); @@ -744,6 +732,7 @@ void removeTraceRole(std::string role) { g_traceLog.removeRole(role); } +<<<<<<< HEAD TraceEvent::TraceEvent() : initialized(true), enabled(false), logged(true) {} TraceEvent::TraceEvent(TraceEvent &&ev) { @@ -788,7 +777,7 @@ TraceEvent& TraceEvent::operator=(TraceEvent &&ev) { return *this; } -void retriveTraceLogIssues(std::vector& out) { +void retriveTraceLogIssues(std::set& out) { return g_traceLog.retriveTraceLogIssues(out); } diff --git a/flow/Trace.h b/flow/Trace.h index 11d274c5c3..7d91eccc57 100644 --- a/flow/Trace.h +++ b/flow/Trace.h @@ -529,13 +529,12 @@ struct ITraceLogFormatter { }; struct ITraceLogIssuesReporter { - // The issue will expire after (now + expirationInterval) seconds - virtual void addAndExpire(std::string issue, - double expirationInterval = FLOW_KNOBS->TRACE_LOG_ISSUE_EXPIRATION_INTERVAL) = 0; + virtual void addIssue(std::string issue) = 0; + virtual void resolveIssue(std::string issue) = 0; // When called, this function will first clean up expired issues. // If it's never called somehow and the trace log thread is struggling, the memory usage may build up. - virtual void retrieveIssues(std::vector& out) = 0; + virtual void retrieveIssues(std::set& out) = 0; virtual void addref() = 0; virtual void delref() = 0; @@ -599,7 +598,7 @@ bool validateTraceClockSource(std::string source); void addTraceRole(std::string role); void removeTraceRole(std::string role); -void retriveTraceLogIssues(std::vector& out); +void retriveTraceLogIssues(std::set& out); template struct ThreadFuture; void pingTraceLogWriterThread(ThreadFuture& p);