Merge pull request #1377 from bnamasivayam/release-6.1

Multi-test processes waits until a timeout if any of the tester proce…
This commit is contained in:
Evan Tschannen 2019-03-28 17:46:17 -07:00 committed by GitHub
commit a30e77b24e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 21 deletions

View File

@ -318,6 +318,11 @@ public:
return waitValueOrSignal(getReply(value), makeDependent<T>(IFailureMonitor::failureMonitor()).onFailedFor(getEndpoint(), sustainedFailureDuration, sustainedFailureSlope), getEndpoint());
}
template <class X>
Future<ErrorOr<X>> getReplyUnlessFailedFor(double sustainedFailureDuration, double sustainedFailureSlope) const {
return getReplyUnlessFailedFor(ReplyPromise<X>(), sustainedFailureDuration, sustainedFailureSlope);
}
explicit RequestStream(const Endpoint& endpoint) : queue(new NetNotifiedQueue<T>(0, 1, endpoint)) {}
FutureStream<T> getFuture() const { queue->addFutureRef(); return FutureStream<T>(queue); }

View File

@ -623,9 +623,18 @@ void logMetrics( vector<PerfMetric> metrics ) {
.detail( "Formatted", format(metrics[idx].format_code().c_str(), metrics[idx].value() ) );
}
template <class T>
void throwIfError(const std::vector<Future<ErrorOr<T>>> &futures, std::string errorMsg) {
for(auto &future:futures) {
if(future.get().isError()) {
TraceEvent(SevError, errorMsg.c_str()).error(future.get().getError());
throw future.get().getError();
}
}
}
ACTOR Future<DistributedTestResults> runWorkload( Database cx, std::vector< TesterInterface > testers,
TestSpec spec ) {
// FIXME: Fault tolerance for test workers (handle nonresponse or broken_promise from each getReply below)
TraceEvent("TestRunning").detail( "WorkloadTitle", printable(spec.title) )
.detail("TesterCount", testers.size()).detail("Phases", spec.phases)
.detail("TestTimeout", spec.timeout);
@ -651,27 +660,29 @@ ACTOR Future<DistributedTestResults> runWorkload( Database cx, std::vector< Test
}
state vector< WorkloadInterface > workloads = wait( getAll( workRequests ) );
state double waitForFailureTime = g_network->isSimulated() ? 24*60*60 : 60;
if( g_network->isSimulated() && spec.simCheckRelocationDuration )
debug_setCheckRelocationDuration( true );
if( spec.phases & TestWorkload::SETUP ) {
std::vector< Future<Void> > setups;
state std::vector< Future<ErrorOr<Void>> > setups;
printf("setting up test (%s)...\n", printable(spec.title).c_str());
TraceEvent("TestSetupStart").detail("WorkloadTitle", printable(spec.title));
for(int i= 0; i < workloads.size(); i++)
setups.push_back( workloads[i].setup.template getReply<Void>() );
setups.push_back( workloads[i].setup.template getReplyUnlessFailedFor<Void>( waitForFailureTime, 0) );
wait( waitForAll( setups ) );
throwIfError(setups, "SetupFailedForWorkload" + printable(spec.title));
TraceEvent("TestSetupComplete").detail("WorkloadTitle", printable(spec.title));
}
if( spec.phases & TestWorkload::EXECUTION ) {
TraceEvent("TestStarting").detail("WorkloadTitle", printable(spec.title));
printf("running test (%s)...\n", printable(spec.title).c_str());
std::vector< Future<Void> > starts;
state std::vector< Future<ErrorOr<Void>> > starts;
for(int i= 0; i < workloads.size(); i++)
starts.push_back( workloads[i].start.template getReply<Void>() );
starts.push_back( workloads[i].start.template getReplyUnlessFailedFor<Void>(waitForFailureTime, 0) );
wait( waitForAll( starts ) );
throwIfError(starts, "StartFailedForWorkload" + printable(spec.title));
printf("%s complete\n", printable(spec.title).c_str());
TraceEvent("TestComplete").detail("WorkloadTitle", printable(spec.title));
}
@ -681,15 +692,16 @@ ACTOR Future<DistributedTestResults> runWorkload( Database cx, std::vector< Test
wait( delay(3.0) );
}
state std::vector< Future<bool> > checks;
state std::vector< Future<ErrorOr<bool>> > checks;
TraceEvent("CheckingResults");
printf("checking test (%s)...\n", printable(spec.title).c_str());
for(int i= 0; i < workloads.size(); i++)
checks.push_back( workloads[i].check.template getReply<bool>() );
checks.push_back( workloads[i].check.template getReplyUnlessFailedFor<bool>(waitForFailureTime, 0) );
wait( waitForAll( checks ) );
throwIfError(checks, "CheckFailedForWorkload" + printable(spec.title));
for(int i = 0; i < checks.size(); i++) {
if(checks[i].get())
if(checks[i].get().get())
success++;
else
failure++;
@ -697,21 +709,15 @@ ACTOR Future<DistributedTestResults> runWorkload( Database cx, std::vector< Test
}
if( spec.phases & TestWorkload::METRICS ) {
state std::vector< Future<vector<PerfMetric>> > metricTasks;
state std::vector< Future<ErrorOr<vector<PerfMetric>>> > metricTasks;
printf("fetching metrics (%s)...\n", printable(spec.title).c_str());
TraceEvent("TestFetchingMetrics").detail("WorkloadTitle", printable(spec.title));
for(int i= 0; i < workloads.size(); i++)
metricTasks.push_back( workloads[i].metrics.template getReply<vector<PerfMetric>>() );
wait( waitForAllReady( metricTasks ) );
int failedMetrics = 0;
metricTasks.push_back( workloads[i].metrics.template getReplyUnlessFailedFor<vector<PerfMetric>>(waitForFailureTime, 0) );
wait( waitForAll( metricTasks ) );
throwIfError(metricTasks, "MetricFailedForWorkload" + printable(spec.title));
for(int i = 0; i < metricTasks.size(); i++) {
if(!metricTasks[i].isError())
metricsResults.push_back( metricTasks[i].get() );
else
TraceEvent(SevError, "TestFailure")
.error(metricTasks[i].getError())
.detail("Reason", "Metrics not retrieved")
.detail("From", workloads[i].metrics.getEndpoint().getPrimaryAddress());
metricsResults.push_back( metricTasks[i].get().get() );
}
}