From 26569273b0faf522ab7b8979641d9c5be23bfc6f Mon Sep 17 00:00:00 2001 From: Chaoguang Lin <chaoguang.lin@snowflake.com> Date: Thu, 22 Apr 2021 19:34:24 -0700 Subject: [PATCH] Update safeThreadFutureToFuture and remove the unnecessary wrapper --- flow/ThreadHelper.actor.cpp | 12 +++++----- flow/ThreadHelper.actor.h | 45 +++++++++++++++---------------------- 2 files changed, 24 insertions(+), 33 deletions(-) diff --git a/flow/ThreadHelper.actor.cpp b/flow/ThreadHelper.actor.cpp index e6d5d09346..785ae2baf4 100644 --- a/flow/ThreadHelper.actor.cpp +++ b/flow/ThreadHelper.actor.cpp @@ -22,6 +22,7 @@ #include "flow/Error.h" #include "flow/UnitTest.h" #include "flow/actorcompiler.h" // This must be the last #include. +#include "flow/flow.h" #include <string> ThreadCallback* ThreadCallback::addCallback(ThreadCallback* cb) { @@ -36,7 +37,7 @@ struct ThreadFutureSendObj { // A simple thread object that cancels the threadFuture struct ThreadFutureCancelObj { - ThreadFutureCancelObj(ThreadSingleAssignmentVar<Void>* tsav) : f(tsav) {} + ThreadFutureCancelObj(ThreadFuture<Void> f) : f(f) {} void operator()() { f.cancel(); } ThreadFuture<Void> f; }; @@ -52,13 +53,12 @@ TEST_CASE("/safeThreadFutureSend") { return Void(); } -// This unit test should be running with TSAN enabled binary +// Test the case where the underlying threadFuture is cancelled TEST_CASE("/safeThreadFutureCancel") { - auto* tsav = new ThreadSingleAssignmentVar<Void>; - state std::thread thread = std::thread{ ThreadFutureCancelObj(tsav) }; + ThreadFuture<Void> f = onMainThread([]() -> Future<Void> { return Never(); }); + state std::thread thread = std::thread{ ThreadFutureCancelObj(f) }; try { - ThreadFuture<Void> f(tsav); - wait(safeThreadFutureToFuture(f)); // this actor should be thrown actor_cancelled + wait(safeThreadFutureToFuture(f)); // this actor should get actor_cancelled ASSERT(false); } catch (Error& e) { ASSERT(e.code() == error_code_actor_cancelled); diff --git a/flow/ThreadHelper.actor.h b/flow/ThreadHelper.actor.h index dd1065481f..8ff7ac4702 100644 --- a/flow/ThreadHelper.actor.h +++ b/flow/ThreadHelper.actor.h @@ -22,6 +22,7 @@ // When actually compiled (NO_INTELLISENSE), include the generated // version of this file. In intellisense use the source version. +#include "flow/Error.h" #if defined(NO_INTELLISENSE) && !defined(FLOW_THREADHELPER_ACTOR_G_H) #define FLOW_THREADHELPER_ACTOR_G_H #include "flow/ThreadHelper.actor.g.h" @@ -556,40 +557,30 @@ private: // The underlying actor that converts ThreadFuture from Future // Note: should be used from main thread +// The cancellation here works both way +// If the underlying "threadFuture" is cancelled, this actor will get actor_cancelled. +// If instead, this actor is cancelled, we will also cancel the underlying "threadFuture" +// Note: we are required to have unique ownership of the "threadFuture" ACTOR template <class T> -static Future<Void> safeThreadFutureToFutureActor(Promise<T> result, ThreadFuture<T> threadFuture) { +Future<T> safeThreadFutureToFuture(ThreadFuture<T> threadFuture) { Promise<Void> ready; Future<Void> onReady = ready.getFuture(); - auto savPtr = ready.extractRawPointer(); - UtilCallback<T>* callback = new UtilCallback<T>(threadFuture, savPtr); + UtilCallback<T>* callback = new UtilCallback<T>(threadFuture, ready.extractRawPointer()); int unused = 0; threadFuture.callOrSetAsCallback(callback, unused, 0); - wait(onReady); + try { + wait(onReady); + } catch (Error& e) { + ASSERT(e.code() == error_code_actor_cancelled); + // prerequisite: we have exclusive ownership of the threadFuture + threadFuture.cancel(); + throw e; + } // threadFuture should be ready + ASSERT(threadFuture.isReady()); if (threadFuture.isError()) - result.sendError(threadFuture.getError()); - result.send(threadFuture.get()); - return Void(); -} - -// A wrapper actor used for cancellation -ACTOR template <class T> -static Future<T> safeThreadFutureToFutureCancellableActor(ThreadFuture<T> threadFuture) { - state Promise<T> result; - Future<Void> cancellable = safeThreadFutureToFutureActor(result, threadFuture); - threadFuture.getPtr()->setCancel(Future<Void>(cancellable)); - wait(cancellable); - Future<T> ready = result.getFuture(); - if (ready.isError()) - throw ready.getError(); - return ready.get(); -} - -// Converts a ThreadFuture into a Future -// Note: This is a thread-safe method when used from the main thread and supports cancellation -template <class T> -Future<T> safeThreadFutureToFuture(ThreadFuture<T> threadFuture) { - return safeThreadFutureToFutureCancellableActor(threadFuture); + throw threadFuture.getError(); + return threadFuture.get(); } ACTOR template <class R, class F>