move waitForMost into generic actors

This commit is contained in:
Xiaoxi Wang 2022-08-02 10:38:11 -07:00
parent b1ff8b8340
commit 3c76ad9e72
3 changed files with 68 additions and 72 deletions

View File

@ -812,24 +812,6 @@ ACTOR Future<ErrorOr<Void>> trySendSnapReq(RequestStream<WorkerSnapRequest> stre
return ErrorOr<Void>(Void());
}
ACTOR static Future<Void> waitForMost(std::vector<Future<ErrorOr<Void>>> futures,
int faultTolerance,
Error e,
double waitMultiplierForSlowFutures = 1.0) {
state std::vector<Future<bool>> successFutures;
state double startTime = now();
successFutures.reserve(futures.size());
for (const auto& future : futures) {
successFutures.push_back(fmap([](auto const& result) { return result.present(); }, future));
}
bool success = wait(quorumEqualsTrue(successFutures, successFutures.size() - faultTolerance));
if (!success) {
throw e;
}
wait(delay((now() - startTime) * waitMultiplierForSlowFutures) || waitForAll(successFutures));
return Void();
}
ACTOR Future<std::map<NetworkAddress, std::pair<WorkerInterface, std::string>>> getStatefulWorkers(
Database cx,
Reference<AsyncVar<ServerDBInfo> const> dbInfo,
@ -1389,14 +1371,6 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
namespace data_distribution_test {
static Future<ErrorOr<Void>> goodTestFuture(double duration) {
return tag(delay(duration), ErrorOr<Void>(Void()));
}
static Future<ErrorOr<Void>> badTestFuture(double duration, Error e) {
return tag(delay(duration), ErrorOr<Void>(e));
}
inline DDShardInfo doubleToNoLocationShardInfo(double d, bool hasDest) {
DDShardInfo res(doubleToTestKey(d), anonymousShardId, anonymousShardId);
res.primarySrc.emplace_back((uint64_t)d, 0);
@ -1409,50 +1383,7 @@ inline DDShardInfo doubleToNoLocationShardInfo(double d, bool hasDest) {
} // namespace data_distribution_test
TEST_CASE("/DataDistribution/WaitForMost") {
state std::vector<Future<ErrorOr<Void>>> futures;
{
futures = { data_distribution_test::goodTestFuture(1),
data_distribution_test::goodTestFuture(2),
data_distribution_test::goodTestFuture(3) };
wait(waitForMost(futures, 1, operation_failed(), 0.0)); // Don't wait for slowest future
ASSERT(!futures[2].isReady());
}
{
futures = { data_distribution_test::goodTestFuture(1),
data_distribution_test::goodTestFuture(2),
data_distribution_test::goodTestFuture(3) };
wait(waitForMost(futures, 0, operation_failed(), 0.0)); // Wait for all futures
ASSERT(futures[2].isReady());
}
{
futures = { data_distribution_test::goodTestFuture(1),
data_distribution_test::goodTestFuture(2),
data_distribution_test::goodTestFuture(3) };
wait(waitForMost(futures, 1, operation_failed(), 1.0)); // Wait for slowest future
ASSERT(futures[2].isReady());
}
{
futures = { data_distribution_test::goodTestFuture(1),
data_distribution_test::goodTestFuture(2),
data_distribution_test::badTestFuture(1, success()) };
wait(waitForMost(futures, 1, operation_failed(), 1.0)); // Error ignored
}
{
futures = { data_distribution_test::goodTestFuture(1),
data_distribution_test::goodTestFuture(2),
data_distribution_test::badTestFuture(1, success()) };
try {
wait(waitForMost(futures, 0, operation_failed(), 1.0));
ASSERT(false);
} catch (Error& e) {
ASSERT_EQ(e.code(), error_code_operation_failed);
}
}
return Void();
}
TEST_CASE("/DataDistributor/StorageWiggler/Order") {
TEST_CASE("/DataDistribution/StorageWiggler/Order") {
StorageWiggler wiggler(nullptr);
wiggler.addServer(UID(1, 0), StorageMetadataType(1, KeyValueStoreType::SSD_BTREE_V2));
wiggler.addServer(UID(2, 0), StorageMetadataType(2, KeyValueStoreType::MEMORY, true));
@ -1469,7 +1400,7 @@ TEST_CASE("/DataDistributor/StorageWiggler/Order") {
return Void();
}
TEST_CASE("/DataDistributor/Initialization/ResumeFromShard") {
TEST_CASE("/DataDistribution/Initialization/ResumeFromShard") {
state Reference<AsyncVar<ServerDBInfo> const> dbInfo;
state Reference<DataDistributor> self(new DataDistributor(dbInfo, UID()));

View File

@ -68,6 +68,24 @@ ACTOR Future<Void> timeoutWarningCollector(FutureStream<Void> input, double logD
}
}
ACTOR Future<Void> waitForMost(std::vector<Future<ErrorOr<Void>>> futures,
int faultTolerance,
Error e,
double waitMultiplierForSlowFutures) {
state std::vector<Future<bool>> successFutures;
state double startTime = now();
successFutures.reserve(futures.size());
for (const auto& future : futures) {
successFutures.push_back(fmap([](auto const& result) { return result.present(); }, future));
}
bool success = wait(quorumEqualsTrue(successFutures, successFutures.size() - faultTolerance));
if (!success) {
throw e;
}
wait(delay((now() - startTime) * waitMultiplierForSlowFutures) || waitForAll(successFutures));
return Void();
}
ACTOR Future<bool> quorumEqualsTrue(std::vector<Future<bool>> futures, int required) {
state std::vector<Future<Void>> true_futures;
state std::vector<Future<Void>> false_futures;
@ -168,6 +186,14 @@ ACTOR Future<Void> testSubscriber(Reference<IAsyncListener<int>> output, Optiona
}
}
static Future<ErrorOr<Void>> goodTestFuture(double duration) {
return tag(delay(duration), ErrorOr<Void>(Void()));
}
static Future<ErrorOr<Void>> badTestFuture(double duration, Error e) {
return tag(delay(duration), ErrorOr<Void>(e));
}
} // namespace
TEST_CASE("/flow/genericactors/AsyncListener") {
@ -181,6 +207,39 @@ TEST_CASE("/flow/genericactors/AsyncListener") {
return Void();
}
TEST_CASE("/flow/genericactors/WaitForMost") {
state std::vector<Future<ErrorOr<Void>>> futures;
{
futures = { goodTestFuture(1), goodTestFuture(2), goodTestFuture(3) };
wait(waitForMost(futures, 1, operation_failed(), 0.0)); // Don't wait for slowest future
ASSERT(!futures[2].isReady());
}
{
futures = { goodTestFuture(1), goodTestFuture(2), goodTestFuture(3) };
wait(waitForMost(futures, 0, operation_failed(), 0.0)); // Wait for all futures
ASSERT(futures[2].isReady());
}
{
futures = { goodTestFuture(1), goodTestFuture(2), goodTestFuture(3) };
wait(waitForMost(futures, 1, operation_failed(), 1.0)); // Wait for slowest future
ASSERT(futures[2].isReady());
}
{
futures = { goodTestFuture(1), goodTestFuture(2), badTestFuture(1, success()) };
wait(waitForMost(futures, 1, operation_failed(), 1.0)); // Error ignored
}
{
futures = { goodTestFuture(1), goodTestFuture(2), badTestFuture(1, success()) };
try {
wait(waitForMost(futures, 0, operation_failed(), 1.0));
ASSERT(false);
} catch (Error& e) {
ASSERT_EQ(e.code(), error_code_operation_failed);
}
}
return Void();
}
#if false
TEST_CASE("/flow/genericactors/generic/storeTuple") {
state std::vector<UID> resA;

View File

@ -858,7 +858,7 @@ Future<Void> timeoutWarningCollector(FutureStream<Void> const& input,
double const& logDelay,
const char* const& context,
UID const& id);
Future<bool> quorumEqualsTrue(std::vector<Future<bool>> const& futures, int const& required);
ACTOR Future<bool> quorumEqualsTrue(std::vector<Future<bool>> futures, int required);
Future<Void> lowPriorityDelay(double const& waitTime);
ACTOR template <class T>
@ -1004,6 +1004,11 @@ Future<Void> waitForAny(std::vector<Future<T>> const& results) {
return quorum(results, 1);
}
ACTOR Future<Void> waitForMost(std::vector<Future<ErrorOr<Void>>> futures,
int faultTolerance,
Error e,
double waitMultiplierForSlowFutures = 1.0);
ACTOR Future<bool> shortCircuitAny(std::vector<Future<bool>> f);
ACTOR template <class T>
@ -2002,6 +2007,7 @@ Future<U> operator>>(Future<T> const& lhs, Future<U> const& rhs) {
* IAsyncListener is similar to AsyncVar, but it decouples the input and output, so the translation unit
* responsible for handling the output does not need to have knowledge of how the output is generated
*/
template <class Output>
class IAsyncListener : public ReferenceCounted<IAsyncListener<Output>> {
public: