Changed issue reporting to be thread safe. Also changed the liveness ping to be thread safe.

This commit is contained in:
Xin Dong 2020-01-29 13:21:50 -08:00
parent a6580dc15f
commit f4f860bfa8
6 changed files with 71 additions and 32 deletions

View File

@ -498,8 +498,8 @@ ACTOR Future<Void> tLog(IKeyValueStore* persistentData, IDiskQueue* persistentQu
ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
Reference<ClusterConnectionFile> ccf, LocalityData locality,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Optional<Reference<AsyncVar<std::set<StringRef>>>> unsuccessfulFlushCount =
Optional<Reference<AsyncVar<std::set<StringRef>>>>());
Optional<Reference<AsyncVar<std::set<std::string>>>> unsuccessfulFlushCount =
Optional<Reference<AsyncVar<std::set<std::string>>>>());
ACTOR Future<Void> resolver(ResolverInterface proxy, InitializeResolverRequest initReq,
Reference<AsyncVar<ServerDBInfo>> db);
ACTOR Future<Void> logRouter(TLogInterface interf, InitializeLogRouterRequest req,

View File

@ -41,6 +41,7 @@
#include "fdbclient/MonitorLeader.h"
#include "fdbclient/ClientWorkerInterface.h"
#include "flow/Profiler.h"
#include "flow/ThreadHelper.actor.h"
#ifdef __linux__
#include <fcntl.h>
@ -746,14 +747,18 @@ ACTOR Future<Void> workerSnapCreate(WorkerSnapRequest snapReq, StringRef snapFol
return Void();
}
ACTOR Future<Void> monitorTraceLogIssues(Optional<Reference<AsyncVar<std::set<StringRef>>>> issues) {
ACTOR Future<Void> monitorTraceLogIssues(Optional<Reference<AsyncVar<std::set<std::string>>>> issues) {
state bool pingTimeout = false;
loop {
wait(delay(SERVER_KNOBS->TRACE_LOG_FLUSH_FAILURE_CHECK_INTERVAL_SECONDS));
Promise<Void> p;
pingTraceLogWriterThread(p);
ThreadFuture<Void> f(new ThreadSingleAssignmentVar<Void>);
Reference<CompletionCallback<Void>> callback =
Reference<CompletionCallback<Void>>(new CompletionCallback<Void>(f));
callback->self = callback;
f.callOrSetAsCallback(callback.getPtr(), callback->userParam, 0);
pingTraceLogWriterThread(f);
try {
wait(timeoutError(p.getFuture(), SERVER_KNOBS->TRACE_LOG_PING_TIMEOUT_SECONDS));
wait(timeoutError(callback->promise.getFuture(), SERVER_KNOBS->TRACE_LOG_PING_TIMEOUT_SECONDS));
} catch (Error& e) {
if (e.code() == error_code_timed_out) {
pingTimeout = true;
@ -762,10 +767,10 @@ ACTOR Future<Void> monitorTraceLogIssues(Optional<Reference<AsyncVar<std::set<St
}
}
if (issues.present()) {
std::set<StringRef> _issues = getTraceLogIssues();
std::set<std::string> _issues = getTraceLogIssues();
if (pingTimeout) {
// Ping trace log writer thread timeout.
_issues.insert(LiteralStringRef("trace_log_writer_thread_likely_died"));
_issues.insert("trace_log_writer_thread_likely_died");
pingTimeout = false;
}
if (_issues.size() > 0) {
@ -778,7 +783,7 @@ ACTOR Future<Void> monitorTraceLogIssues(Optional<Reference<AsyncVar<std::set<St
ACTOR Future<Void> monitorServerDBInfo(Reference<AsyncVar<Optional<ClusterControllerFullInterface>>> ccInterface,
Reference<ClusterConnectionFile> connFile, LocalityData locality,
Reference<AsyncVar<ServerDBInfo>> dbInfo,
Optional<Reference<AsyncVar<std::set<StringRef>>>> issues) {
Optional<Reference<AsyncVar<std::set<std::string>>>> issues) {
// Initially most of the serverDBInfo is not known, but we know our locality right away
ServerDBInfo localInfo;
localInfo.myLocality = locality;
@ -909,7 +914,7 @@ ACTOR Future<Void> workerServer(
state WorkerInterface interf( locality );
interf.initEndpoints();
state Reference<AsyncVar<std::set<StringRef>>> issues(new AsyncVar<std::set<StringRef>>());
state Reference<AsyncVar<std::set<std::string>>> issues(new AsyncVar<std::set<std::string>>());
folder = abspath(folder);

View File

@ -48,8 +48,11 @@
#include <fcntl.h>
#include <cmath>
FileTraceLogWriter::FileTraceLogWriter(std::string directory, std::string processName, std::string basename, std::string extension, uint64_t maxLogsSize, std::function<void()> onError)
: directory(directory), processName(processName), basename(basename), extension(extension), maxLogsSize(maxLogsSize), traceFileFD(-1), index(0), onError(onError) {}
FileTraceLogWriter::FileTraceLogWriter(std::string directory, std::string processName, std::string basename,
std::string extension, uint64_t maxLogsSize, std::function<void()> onError,
Reference<ITraceLogIssuesReporter> issues)
: directory(directory), processName(processName), basename(basename), extension(extension), maxLogsSize(maxLogsSize),
traceFileFD(-1), index(0), onError(onError), issues(issues) {}
void FileTraceLogWriter::addref() {
ReferenceCounted<FileTraceLogWriter>::addref();
@ -64,6 +67,7 @@ void FileTraceLogWriter::lastError(int err) {
// the error and the occurrence of the error are unblocked, even though we haven't actually succeeded in flushing.
// Otherwise a permanent write error would make the program block forever.
if (err != 0 && err != EINTR) {
issues->addIssue("trace_log_writer_flush_error_" + std::to_string(err));
onError();
}
}
@ -79,7 +83,7 @@ void FileTraceLogWriter::write(const std::string& str) {
remaining -= ret;
ptr += ret;
} else {
issues.insert(LiteralStringRef("trace_log_writer_flush_failure"));
issues->addIssue("trace_log_writer_flush_failure");
fprintf(stderr, "Unexpected error [%d] when flushing trace log.\n", errno);
lastError(errno);
threadSleep(0.1);
@ -87,10 +91,6 @@ void FileTraceLogWriter::write(const std::string& str) {
}
}
std::set<StringRef> FileTraceLogWriter::getTraceLogIssues() {
return std::move(issues);
}
void FileTraceLogWriter::open() {
cleanupTraceFiles();
@ -117,6 +117,7 @@ void FileTraceLogWriter::open() {
}
else {
fprintf(stderr, "ERROR: could not create trace log file `%s' (%d: %s)\n", finalname.c_str(), errno, strerror(errno));
issues->addIssue("trace_log_writer_could_not_create_trace_log_file");
int errorNum = errno;
onMainThreadVoid([finalname, errorNum]{

View File

@ -39,12 +39,13 @@ private:
uint64_t maxLogsSize;
int traceFileFD;
uint32_t index;
std::set<StringRef> issues;
Reference<ITraceLogIssuesReporter> issues;
std::function<void()> onError;
public:
FileTraceLogWriter(std::string directory, std::string processName, std::string basename, std::string extension, uint64_t maxLogsSize, std::function<void()> onError);
FileTraceLogWriter(std::string directory, std::string processName, std::string basename, std::string extension,
uint64_t maxLogsSize, std::function<void()> onError, Reference<ITraceLogIssuesReporter> issues);
void addref();
void delref();
@ -58,7 +59,6 @@ public:
void sync();
void cleanupTraceFiles();
std::set<StringRef> getTraceLogIssues() override;
};
#endif

View File

@ -220,6 +220,28 @@ public:
}
};
struct IssuesList : ITraceLogIssuesReporter, ThreadSafeReferenceCounted<IssuesList> {
IssuesList(){};
void insertIssue(std::string& issue) {
MutexHolder h(mutex);
issues.insert(issue);
}
std::set<std::string> getAndFlushIssues() {
MutexHolder h(mutex);
return std::move(issues);
}
void addref() { ThreadSafeReferenceCounted<IssuesList>::addref(); }
void delref() { ThreadSafeReferenceCounted<IssuesList>::delref(); }
private:
Mutex mutex;
std::set<std::string> issues;
};
Reference<IssuesList> issues;
Reference<BarrierList> barriers;
struct WriterThread : IThreadPoolReceiver {
@ -282,12 +304,12 @@ public:
}
struct Ping : TypedAction<WriterThread, Ping> {
Promise<Void> p;
ThreadFuture<Void> p;
explicit Ping(Promise<Void> p) : p(p){};
explicit Ping(ThreadFuture<Void> p) : p(p){};
virtual double getTimeEstimate() { return 0; }
};
void action(Ping& a) { a.p.send(Void()); }
void action(Ping& a) { ((ThreadSingleAssignmentVar<Void>*)a.p.getPtr())->send(Void()); }
};
TraceLog() : bufferLength(0), loggedLength(0), opened(false), preopenOverflowCount(0), barriers(new BarrierList), logTraceEventMetrics(false), formatter(new XmlTraceLogFormatter()) {}
@ -303,7 +325,9 @@ public:
this->localAddress = na;
basename = format("%s/%s.%s.%s", directory.c_str(), processName.c_str(), timestamp.c_str(), deterministicRandom()->randomAlphaNumeric(6).c_str());
logWriter = Reference<ITraceLogWriter>(new FileTraceLogWriter(directory, processName, basename, formatter->getExtension(), maxLogsSize, [this](){ barriers->triggerAll(); }));
logWriter = Reference<ITraceLogWriter>(new FileTraceLogWriter(directory, processName, basename,
formatter->getExtension(), maxLogsSize,
[this]() { barriers->triggerAll(); }, issues));
if ( g_network->isSimulated() )
writer = Reference<IThreadPool>(new DummyThreadPool());
@ -501,12 +525,12 @@ public:
}
}
void pingWriterThread(Promise<Void>& p) {
void pingWriterThread(ThreadFuture<Void>& p) {
auto a = new WriterThread::Ping(p);
writer->post(a);
}
std::set<StringRef> getTraceLogIssues() { return logWriter->getTraceLogIssues(); }
std::set<std::string> getTraceLogIssues() { return issues->getAndFlushIssues(); }
~TraceLog() {
close();
@ -699,7 +723,6 @@ void removeTraceRole(std::string role) {
g_traceLog.removeRole(role);
}
<<<<<<< HEAD
TraceEvent::TraceEvent() : initialized(true), enabled(false), logged(true) {}
TraceEvent::TraceEvent(TraceEvent &&ev) {
@ -747,10 +770,13 @@ uint64_t getUnsuccessfulFlushCount() {
return g_traceLog.getUnsuccessfulFlushCount();
}
std::set<StringRef> getTraceLogIssues() {
=======
std::set<std::string> getTraceLogIssues() {
>>>>>>> Changed issue reporting to be thread safe. Also changed the liveness ping to be thread safe.
return std::move(g_traceLog.getTraceLogIssues());
}
void pingTraceLogWriterThread(Promise<Void>& p) {
void pingTraceLogWriterThread(ThreadFuture<Void>& p) {
return g_traceLog.pingWriterThread(p);
}

View File

@ -516,7 +516,6 @@ struct ITraceLogWriter {
virtual void addref() = 0;
virtual void delref() = 0;
virtual std::set<struct StringRef> getTraceLogIssues() = 0;
};
struct ITraceLogFormatter {
@ -529,6 +528,14 @@ struct ITraceLogFormatter {
virtual void delref() = 0;
};
struct ITraceLogIssuesReporter {
virtual void addIssue(std::string issue) = 0;
virtual std::set<std::string> getAndFlushIssues() = 0;
virtual void addref() = 0;
virtual void delref() = 0;
};
struct TraceInterval {
TraceInterval( const char* type ) : count(-1), type(type), severity(SevInfo) {}
@ -587,10 +594,10 @@ bool validateTraceClockSource(std::string source);
void addTraceRole(std::string role);
void removeTraceRole(std::string role);
std::set<struct StringRef> getTraceLogIssues();
std::set<std::string> getTraceLogIssues();
template <class T>
struct Promise;
void pingTraceLogWriterThread(Promise<struct Void>& p);
struct ThreadFuture;
void pingTraceLogWriterThread(ThreadFuture<struct Void>& p);
enum trace_clock_t { TRACE_CLOCK_NOW, TRACE_CLOCK_REALTIME };
extern std::atomic<trace_clock_t> g_trace_clock;