Update the thread-safe Future-to-ThreadFuture method, added two TSAN unit tests

This commit is contained in:
Chaoguang Lin 2021-04-13 13:21:04 -07:00
parent 107f66e4e1
commit 4fdfca0dc0
4 changed files with 95 additions and 32 deletions

View File

@ -58,7 +58,7 @@ set(FLOW_SRCS
TLSConfig.actor.cpp
TLSConfig.actor.h
ThreadHelper.actor.h
ThreadHelper.cpp
ThreadHelper.actor.cpp
ThreadPrimitives.cpp
ThreadPrimitives.h
ThreadSafeQueue.h

View File

@ -0,0 +1,68 @@
/*
* ThreadHelper.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/ThreadHelper.actor.h"
#include "flow/Error.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include <string>
ThreadCallback* ThreadCallback::addCallback(ThreadCallback* cb) {
return (new ThreadMultiCallback())->addCallback(this)->addCallback(cb);
}
// A simple thread object that sends the result
struct ThreadFutureSendObj {
void operator()() { tsav->send(Void()); }
ThreadSingleAssignmentVar<Void>* tsav;
};
// A simple thread object that cancels the threadFuture
struct ThreadFutureCancelObj {
ThreadFutureCancelObj(ThreadSingleAssignmentVar<Void>* tsav) : f(tsav) {}
void operator()() { f.cancel(); }
ThreadFuture<Void> f;
};
// This unit test should be running with TSAN enabled binary
TEST_CASE("/safeThreadFutureSend") {
auto* tsav = new ThreadSingleAssignmentVar<Void>;
state std::thread thread = std::thread{ ThreadFutureSendObj{ tsav } };
ThreadFuture<Void> f(tsav);
// change this to unsafeThreadFutureToFuture will get a data-race failure
wait(safeThreadFutureToFuture(f));
thread.join();
return Void();
}
// This unit test should be running with TSAN enabled binary
TEST_CASE("/safeThreadFutureCancel") {
auto* tsav = new ThreadSingleAssignmentVar<Void>;
state std::thread thread = std::thread{ ThreadFutureCancelObj(tsav) };
try {
ThreadFuture<Void> f(tsav);
wait(safeThreadFutureToFuture(f)); // this actor should be thrown actor_cancelled
ASSERT(false);
} catch (Error& e) {
ASSERT(e.code() == error_code_actor_cancelled);
}
thread.join();
return Void();
}

View File

@ -534,7 +534,7 @@ Future<T> unsafeThreadFutureToFuture(ThreadFuture<T> threadFuture) {
// A callback waiting on a thread future and will delete itself once fired
template <class T>
struct UtilCallback : public ThreadCallback, ReferenceCounted<UtilCallback<T>> {
struct UtilCallback : public ThreadCallback {
public:
UtilCallback(ThreadFuture<T> f, void* userdata) : f(f), userdata(userdata) {}
@ -547,29 +547,49 @@ public:
g_network->onMainThread(Promise<Void>((SAV<Void>*)userdata), TaskPriority::DefaultOnMainThread);
delete this;
}
void destroy() override {}
private:
ThreadFuture<T> f;
void* userdata;
};
// The underlying actor that converts ThreadFuture from Future
// Note: should be used from main thread
ACTOR template <class T>
static Future<T> safeThreadFutureToFutureActor(ThreadFuture<T> threadFuture) {
static Future<Void> safeThreadFutureToFutureActor(Promise<T> result, ThreadFuture<T> threadFuture) {
Promise<Void> ready;
Future<Void> onReady = ready.getFuture();
UtilCallback<T>* callback = new UtilCallback<T>(threadFuture, ready.extractRawPointer());
auto savPtr = ready.extractRawPointer();
UtilCallback<T>* callback = new UtilCallback<T>(threadFuture, savPtr);
int unused = 0;
threadFuture.callOrSetAsCallback(callback, unused, 0);
wait(onReady);
// threadFuture should be ready
if (threadFuture.isError())
throw threadFuture.getError();
return threadFuture.get();
result.sendError(threadFuture.getError());
result.send(threadFuture.get());
return Void();
}
// A wrapper actor used for cancellation
ACTOR template <class T>
static Future<T> safeThreadFutureToFutureCancellableActor(ThreadFuture<T> threadFuture) {
state Promise<T> result;
Future<Void> cancellable = safeThreadFutureToFutureActor(result, threadFuture);
threadFuture.getPtr()->setCancel(Future<Void>(cancellable));
wait(cancellable);
Future<T> ready = result.getFuture();
if (ready.isError())
throw ready.getError();
return ready.get();
}
// Converts a ThreadFuture into a Future
// Note: This is a thread-safe method when used from the main thread and supports cancellation
template <class T>
Future<T> safeThreadFutureToFuture(ThreadFuture<T> threadFuture) {
return safeThreadFutureToFutureActor(threadFuture);
return safeThreadFutureToFutureCancellableActor(threadFuture);
}
ACTOR template <class R, class F>

View File

@ -1,25 +0,0 @@
/*
* ThreadHelper.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "flow/ThreadHelper.actor.h"
ThreadCallback* ThreadCallback::addCallback(ThreadCallback* cb) {
return (new ThreadMultiCallback())->addCallback(this)->addCallback(cb);
}