Fix timeout error due to lost exception
Found in tests, a move key conflict exception was not handled because the Future object was not waited by someone. As a result, the data distributor did not die and database checking couldn't get the metric and keep trying until timeout.
This commit is contained in:
parent
c38b2a8c38
commit
99e109d6c5
|
@ -3379,6 +3379,7 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
|
||||||
state PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > ddStorageServerChanges;
|
state PromiseStream< std::pair<UID, Optional<StorageServerInterface>> > ddStorageServerChanges;
|
||||||
state double lastLimited = 0;
|
state double lastLimited = 0;
|
||||||
state Future<Void> distributor = reportErrorsExcept( dataDistribution( self->dbInfo, di.id(), self->configuration->get(), ddStorageServerChanges, self->primaryDcId, self->remoteDcIds, &lastLimited ), "DataDistribution", di.id(), &normalDataDistributorErrors() );
|
state Future<Void> distributor = reportErrorsExcept( dataDistribution( self->dbInfo, di.id(), self->configuration->get(), ddStorageServerChanges, self->primaryDcId, self->remoteDcIds, &lastLimited ), "DataDistribution", di.id(), &normalDataDistributorErrors() );
|
||||||
|
self->addActor.send( distributor );
|
||||||
self->addActor.send( reportErrorsExcept( rateKeeper( self->dbInfo, ddStorageServerChanges, di.getRateInfo.getFuture(), self->configuration->get(), &lastLimited ), "Ratekeeper", di.id(), &normalRateKeeperErrors() ) );
|
self->addActor.send( reportErrorsExcept( rateKeeper( self->dbInfo, ddStorageServerChanges, di.getRateInfo.getFuture(), self->configuration->get(), &lastLimited ), "Ratekeeper", di.id(), &normalRateKeeperErrors() ) );
|
||||||
|
|
||||||
state Future<Void> reply;
|
state Future<Void> reply;
|
||||||
|
@ -3397,22 +3398,25 @@ ACTOR Future<Void> dataDistributor(DataDistributorInterface di, Reference<AsyncV
|
||||||
|
|
||||||
trigger = self->configurationTrigger.onTrigger();
|
trigger = self->configurationTrigger.onTrigger();
|
||||||
choose {
|
choose {
|
||||||
when (wait(brokenPromiseToNever(reply))) {
|
when ( wait( brokenPromiseToNever(reply) ) ) {
|
||||||
TraceEvent("DataDistributorRejoined", di.id())
|
TraceEvent("DataDistributorRejoined", di.id())
|
||||||
.detail("ClusterControllerID", lastClusterControllerID);
|
.detail("ClusterControllerID", lastClusterControllerID);
|
||||||
}
|
}
|
||||||
when (wait(self->dbInfo->onChange())) {
|
when ( wait( self->dbInfo->onChange() ) ) {
|
||||||
const DataDistributorInterface& distributor = self->dbInfo->get().distributor;
|
const DataDistributorInterface& distributor = self->dbInfo->get().distributor;
|
||||||
if ( distributor.isValid() && distributor.id() != di.id() ) {
|
if ( distributor.isValid() && distributor.id() != di.id() ) {
|
||||||
TraceEvent("DataDistributor", di.id()).detail("FoundAnotherDdID", distributor.id());
|
TraceEvent("DataDistributorExit", di.id()).detail("CurrentLiveID", distributor.id());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
when (wait(trigger)) {
|
when ( wait( trigger ) ) {
|
||||||
|
TraceEvent("DataDistributorRestart", di.id())
|
||||||
|
.detail("ClusterControllerID", lastClusterControllerID);
|
||||||
self->refreshDcIds();
|
self->refreshDcIds();
|
||||||
distributor = reportErrorsExcept( dataDistribution( self->dbInfo, di.id(), self->configuration->get(), ddStorageServerChanges, self->primaryDcId, self->remoteDcIds, &lastLimited ), "DataDistribution", di.id(), &normalDataDistributorErrors() );
|
distributor = reportErrorsExcept( dataDistribution( self->dbInfo, di.id(), self->configuration->get(), ddStorageServerChanges, self->primaryDcId, self->remoteDcIds, &lastLimited ), "DataDistribution", di.id(), &normalDataDistributorErrors() );
|
||||||
|
self->addActor.send( distributor );
|
||||||
}
|
}
|
||||||
when (wait(collection)) {
|
when ( wait( collection ) ) {
|
||||||
ASSERT(false);
|
ASSERT(false);
|
||||||
throw internal_error();
|
throw internal_error();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue