Fix the random crash. Use a thread safe 'ThreadReturnPromise' instead of the ThreadFuture.

This commit is contained in:
Xin Dong 2020-03-16 13:36:55 -07:00
parent 5967ef5eab
commit 89861c661e
3 changed files with 19 additions and 20 deletions

View File

@ -749,17 +749,12 @@ ACTOR Future<Void> workerSnapCreate(WorkerSnapRequest snapReq, StringRef snapFol
ACTOR Future<Void> monitorTraceLogIssues(Optional<Reference<AsyncVar<std::set<std::string>>>> issues) {
state bool pingTimeout = false;
state ThreadFuture<Void> f;
state Reference<CompletionCallback<Void>> callback;
loop {
wait(delay(SERVER_KNOBS->TRACE_LOG_FLUSH_FAILURE_CHECK_INTERVAL_SECONDS));
f = ThreadFuture<Void>(new ThreadSingleAssignmentVar<Void>);
callback = Reference<CompletionCallback<Void>>(new CompletionCallback<Void>(f));
callback->self = callback;
f.callOrSetAsCallback(callback.getPtr(), callback->userParam, 0);
pingTraceLogWriterThread(f);
TraceEvent("CrashDebugPingActionSetupInWorker");
Future<Void> pingAck = pingTraceLogWriterThread();
try {
wait(timeoutError(callback->promise.getFuture(), SERVER_KNOBS->TRACE_LOG_PING_TIMEOUT_SECONDS));
wait(timeoutError(pingAck, SERVER_KNOBS->TRACE_LOG_PING_TIMEOUT_SECONDS));
} catch (Error& e) {
if (e.code() == error_code_timed_out) {
pingTimeout = true;

View File

@ -312,16 +312,17 @@ public:
}
struct Ping : TypedAction<WriterThread, Ping> {
ThreadFuture<Void> p;
ThreadReturnPromise<Void> ack;
explicit Ping(ThreadFuture<Void> p) : p(p){};
explicit Ping(){};
virtual double getTimeEstimate() { return 0; }
};
void action(Ping& a) {
void action(Ping& ping) {
try {
((ThreadSingleAssignmentVar<Void>*)a.p.getPtr())->send(Void());
ping.ack.send(Void());
} catch (Error& e) {
TraceEvent(SevError, "PingActionFailed").error(e);
TraceEvent(SevError, "CrashDebugPingActionFailed").error(e);
throw;
}
}
};
@ -541,9 +542,11 @@ public:
}
}
void pingWriterThread(ThreadFuture<Void>& p) {
auto a = new WriterThread::Ping(p);
writer->post(a);
Future<Void> pingWriterThread() {
auto ping = new WriterThread::Ping;
writer->post(ping);
auto f = ping->ack.getFuture();
return f;
}
void retriveTraceLogIssues(std::set<std::string>& out) { return issues->retrieveIssues(out); }
@ -787,8 +790,8 @@ void retriveTraceLogIssues(std::set<std::string>& out) {
return g_traceLog.retriveTraceLogIssues(out);
}
void pingTraceLogWriterThread(ThreadFuture<Void>& p) {
return g_traceLog.pingWriterThread(p);
Future<Void> pingTraceLogWriterThread() {
return g_traceLog.pingWriterThread();
}
TraceEvent::TraceEvent( const char* type, UID id ) : id(id), type(type), severity(SevInfo), initialized(false), enabled(true), logged(false) {

View File

@ -609,8 +609,9 @@ void addTraceRole(std::string role);
void removeTraceRole(std::string role);
void retriveTraceLogIssues(std::set<std::string>& out);
template <class T>
struct ThreadFuture;
void pingTraceLogWriterThread(ThreadFuture<struct Void>& p);
struct Future;
struct Void;
Future<Void> pingTraceLogWriterThread();
enum trace_clock_t { TRACE_CLOCK_NOW, TRACE_CLOCK_REALTIME };
extern std::atomic<trace_clock_t> g_trace_clock;