From 89861c661e8a637369eb078680aa6ec628b68d60 Mon Sep 17 00:00:00 2001 From: Xin Dong Date: Mon, 16 Mar 2020 13:36:55 -0700 Subject: [PATCH] Fix the random crash. Use a thread safe 'ThreadReturnPromise' instead of the ThreadFuture. --- fdbserver/worker.actor.cpp | 11 +++-------- flow/Trace.cpp | 23 +++++++++++++---------- flow/Trace.h | 5 +++-- 3 files changed, 19 insertions(+), 20 deletions(-) diff --git a/fdbserver/worker.actor.cpp b/fdbserver/worker.actor.cpp index face2a17fd..5420a89907 100644 --- a/fdbserver/worker.actor.cpp +++ b/fdbserver/worker.actor.cpp @@ -749,17 +749,12 @@ ACTOR Future workerSnapCreate(WorkerSnapRequest snapReq, StringRef snapFol ACTOR Future monitorTraceLogIssues(Optional>>> issues) { state bool pingTimeout = false; - state ThreadFuture f; - state Reference> callback; loop { wait(delay(SERVER_KNOBS->TRACE_LOG_FLUSH_FAILURE_CHECK_INTERVAL_SECONDS)); - f = ThreadFuture(new ThreadSingleAssignmentVar); - callback = Reference>(new CompletionCallback(f)); - callback->self = callback; - f.callOrSetAsCallback(callback.getPtr(), callback->userParam, 0); - pingTraceLogWriterThread(f); + TraceEvent("CrashDebugPingActionSetupInWorker"); + Future pingAck = pingTraceLogWriterThread(); try { - wait(timeoutError(callback->promise.getFuture(), SERVER_KNOBS->TRACE_LOG_PING_TIMEOUT_SECONDS)); + wait(timeoutError(pingAck, SERVER_KNOBS->TRACE_LOG_PING_TIMEOUT_SECONDS)); } catch (Error& e) { if (e.code() == error_code_timed_out) { pingTimeout = true; diff --git a/flow/Trace.cpp b/flow/Trace.cpp index 4cf1cedb37..c769baca4d 100644 --- a/flow/Trace.cpp +++ b/flow/Trace.cpp @@ -312,16 +312,17 @@ public: } struct Ping : TypedAction { - ThreadFuture p; + ThreadReturnPromise ack; - explicit Ping(ThreadFuture p) : p(p){}; + explicit Ping(){}; virtual double getTimeEstimate() { return 0; } }; - void action(Ping& a) { + void action(Ping& ping) { try { - ((ThreadSingleAssignmentVar*)a.p.getPtr())->send(Void()); + ping.ack.send(Void()); } catch (Error& e) { - TraceEvent(SevError, "PingActionFailed").error(e); + TraceEvent(SevError, "CrashDebugPingActionFailed").error(e); + throw; } } }; @@ -541,9 +542,11 @@ public: } } - void pingWriterThread(ThreadFuture& p) { - auto a = new WriterThread::Ping(p); - writer->post(a); + Future pingWriterThread() { + auto ping = new WriterThread::Ping; + writer->post(ping); + auto f = ping->ack.getFuture(); + return f; } void retriveTraceLogIssues(std::set& out) { return issues->retrieveIssues(out); } @@ -787,8 +790,8 @@ void retriveTraceLogIssues(std::set& out) { return g_traceLog.retriveTraceLogIssues(out); } -void pingTraceLogWriterThread(ThreadFuture& p) { - return g_traceLog.pingWriterThread(p); +Future pingTraceLogWriterThread() { + return g_traceLog.pingWriterThread(); } TraceEvent::TraceEvent( const char* type, UID id ) : id(id), type(type), severity(SevInfo), initialized(false), enabled(true), logged(false) { diff --git a/flow/Trace.h b/flow/Trace.h index c4c26b90a8..bb3077c0a7 100644 --- a/flow/Trace.h +++ b/flow/Trace.h @@ -609,8 +609,9 @@ void addTraceRole(std::string role); void removeTraceRole(std::string role); void retriveTraceLogIssues(std::set& out); template -struct ThreadFuture; -void pingTraceLogWriterThread(ThreadFuture& p); +struct Future; +struct Void; +Future pingTraceLogWriterThread(); enum trace_clock_t { TRACE_CLOCK_NOW, TRACE_CLOCK_REALTIME }; extern std::atomic g_trace_clock;