Update safeThreadFutureToFuture and remove the unnecessary wrapper

This commit is contained in:
Chaoguang Lin 2021-04-22 19:34:24 -07:00
parent f0a236c544
commit 26569273b0
2 changed files with 24 additions and 33 deletions

View File

@ -22,6 +22,7 @@
#include "flow/Error.h"
#include "flow/UnitTest.h"
#include "flow/actorcompiler.h" // This must be the last #include.
#include "flow/flow.h"
#include <string>
ThreadCallback* ThreadCallback::addCallback(ThreadCallback* cb) {
@ -36,7 +37,7 @@ struct ThreadFutureSendObj {
// A simple thread object that cancels the threadFuture
struct ThreadFutureCancelObj {
ThreadFutureCancelObj(ThreadSingleAssignmentVar<Void>* tsav) : f(tsav) {}
ThreadFutureCancelObj(ThreadFuture<Void> f) : f(f) {}
void operator()() { f.cancel(); }
ThreadFuture<Void> f;
};
@ -52,13 +53,12 @@ TEST_CASE("/safeThreadFutureSend") {
return Void();
}
// This unit test should be running with TSAN enabled binary
// Test the case where the underlying threadFuture is cancelled
TEST_CASE("/safeThreadFutureCancel") {
auto* tsav = new ThreadSingleAssignmentVar<Void>;
state std::thread thread = std::thread{ ThreadFutureCancelObj(tsav) };
ThreadFuture<Void> f = onMainThread([]() -> Future<Void> { return Never(); });
state std::thread thread = std::thread{ ThreadFutureCancelObj(f) };
try {
ThreadFuture<Void> f(tsav);
wait(safeThreadFutureToFuture(f)); // this actor should be thrown actor_cancelled
wait(safeThreadFutureToFuture(f)); // this actor should get actor_cancelled
ASSERT(false);
} catch (Error& e) {
ASSERT(e.code() == error_code_actor_cancelled);

View File

@ -22,6 +22,7 @@
// When actually compiled (NO_INTELLISENSE), include the generated
// version of this file. In intellisense use the source version.
#include "flow/Error.h"
#if defined(NO_INTELLISENSE) && !defined(FLOW_THREADHELPER_ACTOR_G_H)
#define FLOW_THREADHELPER_ACTOR_G_H
#include "flow/ThreadHelper.actor.g.h"
@ -556,40 +557,30 @@ private:
// The underlying actor that converts ThreadFuture from Future
// Note: should be used from main thread
// The cancellation here works both way
// If the underlying "threadFuture" is cancelled, this actor will get actor_cancelled.
// If instead, this actor is cancelled, we will also cancel the underlying "threadFuture"
// Note: we are required to have unique ownership of the "threadFuture"
ACTOR template <class T>
static Future<Void> safeThreadFutureToFutureActor(Promise<T> result, ThreadFuture<T> threadFuture) {
Future<T> safeThreadFutureToFuture(ThreadFuture<T> threadFuture) {
Promise<Void> ready;
Future<Void> onReady = ready.getFuture();
auto savPtr = ready.extractRawPointer();
UtilCallback<T>* callback = new UtilCallback<T>(threadFuture, savPtr);
UtilCallback<T>* callback = new UtilCallback<T>(threadFuture, ready.extractRawPointer());
int unused = 0;
threadFuture.callOrSetAsCallback(callback, unused, 0);
wait(onReady);
try {
wait(onReady);
} catch (Error& e) {
ASSERT(e.code() == error_code_actor_cancelled);
// prerequisite: we have exclusive ownership of the threadFuture
threadFuture.cancel();
throw e;
}
// threadFuture should be ready
ASSERT(threadFuture.isReady());
if (threadFuture.isError())
result.sendError(threadFuture.getError());
result.send(threadFuture.get());
return Void();
}
// A wrapper actor used for cancellation
ACTOR template <class T>
static Future<T> safeThreadFutureToFutureCancellableActor(ThreadFuture<T> threadFuture) {
state Promise<T> result;
Future<Void> cancellable = safeThreadFutureToFutureActor(result, threadFuture);
threadFuture.getPtr()->setCancel(Future<Void>(cancellable));
wait(cancellable);
Future<T> ready = result.getFuture();
if (ready.isError())
throw ready.getError();
return ready.get();
}
// Converts a ThreadFuture into a Future
// Note: This is a thread-safe method when used from the main thread and supports cancellation
template <class T>
Future<T> safeThreadFutureToFuture(ThreadFuture<T> threadFuture) {
return safeThreadFutureToFutureCancellableActor(threadFuture);
throw threadFuture.getError();
return threadFuture.get();
}
ACTOR template <class R, class F>