diff --git a/fdbrpc/fdbrpc.h b/fdbrpc/fdbrpc.h index 6edfefc90b..0240b97b6f 100644 --- a/fdbrpc/fdbrpc.h +++ b/fdbrpc/fdbrpc.h @@ -318,6 +318,11 @@ public: return waitValueOrSignal(getReply(value), makeDependent(IFailureMonitor::failureMonitor()).onFailedFor(getEndpoint(), sustainedFailureDuration, sustainedFailureSlope), getEndpoint()); } + template + Future> getReplyUnlessFailedFor(double sustainedFailureDuration, double sustainedFailureSlope) const { + return getReplyUnlessFailedFor(ReplyPromise(), sustainedFailureDuration, sustainedFailureSlope); + } + explicit RequestStream(const Endpoint& endpoint) : queue(new NetNotifiedQueue(0, 1, endpoint)) {} FutureStream getFuture() const { queue->addFutureRef(); return FutureStream(queue); } diff --git a/fdbserver/tester.actor.cpp b/fdbserver/tester.actor.cpp index 7dd66ccc99..5e7fc2e226 100644 --- a/fdbserver/tester.actor.cpp +++ b/fdbserver/tester.actor.cpp @@ -623,9 +623,18 @@ void logMetrics( vector metrics ) { .detail( "Formatted", format(metrics[idx].format_code().c_str(), metrics[idx].value() ) ); } +template +void throwIfError(const std::vector>> &futures, std::string errorMsg) { + for(auto &future:futures) { + if(future.get().isError()) { + TraceEvent(SevError, errorMsg.c_str()).error(future.get().getError()); + throw future.get().getError(); + } + } +} + ACTOR Future runWorkload( Database cx, std::vector< TesterInterface > testers, TestSpec spec ) { - // FIXME: Fault tolerance for test workers (handle nonresponse or broken_promise from each getReply below) TraceEvent("TestRunning").detail( "WorkloadTitle", printable(spec.title) ) .detail("TesterCount", testers.size()).detail("Phases", spec.phases) .detail("TestTimeout", spec.timeout); @@ -651,27 +660,29 @@ ACTOR Future runWorkload( Database cx, std::vector< Test } state vector< WorkloadInterface > workloads = wait( getAll( workRequests ) ); - + state double waitForFailureTime = g_network->isSimulated() ? 24*60*60 : 60; if( g_network->isSimulated() && spec.simCheckRelocationDuration ) debug_setCheckRelocationDuration( true ); if( spec.phases & TestWorkload::SETUP ) { - std::vector< Future > setups; + state std::vector< Future> > setups; printf("setting up test (%s)...\n", printable(spec.title).c_str()); TraceEvent("TestSetupStart").detail("WorkloadTitle", printable(spec.title)); for(int i= 0; i < workloads.size(); i++) - setups.push_back( workloads[i].setup.template getReply() ); + setups.push_back( workloads[i].setup.template getReplyUnlessFailedFor( waitForFailureTime, 0) ); wait( waitForAll( setups ) ); + throwIfError(setups, "SetupFailedForWorkload" + printable(spec.title)); TraceEvent("TestSetupComplete").detail("WorkloadTitle", printable(spec.title)); } if( spec.phases & TestWorkload::EXECUTION ) { TraceEvent("TestStarting").detail("WorkloadTitle", printable(spec.title)); printf("running test (%s)...\n", printable(spec.title).c_str()); - std::vector< Future > starts; + state std::vector< Future> > starts; for(int i= 0; i < workloads.size(); i++) - starts.push_back( workloads[i].start.template getReply() ); + starts.push_back( workloads[i].start.template getReplyUnlessFailedFor(waitForFailureTime, 0) ); wait( waitForAll( starts ) ); + throwIfError(starts, "StartFailedForWorkload" + printable(spec.title)); printf("%s complete\n", printable(spec.title).c_str()); TraceEvent("TestComplete").detail("WorkloadTitle", printable(spec.title)); } @@ -681,15 +692,16 @@ ACTOR Future runWorkload( Database cx, std::vector< Test wait( delay(3.0) ); } - state std::vector< Future > checks; + state std::vector< Future> > checks; TraceEvent("CheckingResults"); printf("checking test (%s)...\n", printable(spec.title).c_str()); for(int i= 0; i < workloads.size(); i++) - checks.push_back( workloads[i].check.template getReply() ); + checks.push_back( workloads[i].check.template getReplyUnlessFailedFor(waitForFailureTime, 0) ); wait( waitForAll( checks ) ); - + throwIfError(checks, "CheckFailedForWorkload" + printable(spec.title)); + for(int i = 0; i < checks.size(); i++) { - if(checks[i].get()) + if(checks[i].get().get()) success++; else failure++; @@ -697,21 +709,15 @@ ACTOR Future runWorkload( Database cx, std::vector< Test } if( spec.phases & TestWorkload::METRICS ) { - state std::vector< Future> > metricTasks; + state std::vector< Future>> > metricTasks; printf("fetching metrics (%s)...\n", printable(spec.title).c_str()); TraceEvent("TestFetchingMetrics").detail("WorkloadTitle", printable(spec.title)); for(int i= 0; i < workloads.size(); i++) - metricTasks.push_back( workloads[i].metrics.template getReply>() ); - wait( waitForAllReady( metricTasks ) ); - int failedMetrics = 0; + metricTasks.push_back( workloads[i].metrics.template getReplyUnlessFailedFor>(waitForFailureTime, 0) ); + wait( waitForAll( metricTasks ) ); + throwIfError(metricTasks, "MetricFailedForWorkload" + printable(spec.title)); for(int i = 0; i < metricTasks.size(); i++) { - if(!metricTasks[i].isError()) - metricsResults.push_back( metricTasks[i].get() ); - else - TraceEvent(SevError, "TestFailure") - .error(metricTasks[i].getError()) - .detail("Reason", "Metrics not retrieved") - .detail("From", workloads[i].metrics.getEndpoint().getPrimaryAddress()); + metricsResults.push_back( metricTasks[i].get().get() ); } }