Fixed a crash related to destruction order in data distribution

This commit is contained in:
Evan Tschannen 2020-05-10 23:14:19 -07:00
parent f17f00fdd5
commit 48b1b20f67
2 changed files with 17 additions and 10 deletions

View File

@ -679,8 +679,8 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
// before the server_status map to which it has a pointer, is destroyed.
for(auto it = server_info.begin(); it != server_info.end(); ++it) {
it->second->tracker.cancel();
it->second->collection = nullptr;
}
teamBuilder.cancel();
}
@ -2570,7 +2570,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
TCServerInfo::~TCServerInfo() {
if (ssVersionTooFarBehind.get()) {
if (collection && ssVersionTooFarBehind.get()) {
collection->removeLaggingStorageServer(lastKnownInterface.locality.zoneId().get());
}
}
@ -4446,6 +4446,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self)
state DatabaseConfiguration configuration;
state Reference<InitialDataDistribution> initData;
state MoveKeysLock lock;
state Reference<DDTeamCollection> primaryTeamCollection;
state Reference<DDTeamCollection> remoteTeamCollection;
loop {
try {
loop {
@ -4607,10 +4609,10 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self)
actors.push_back( reportErrorsExcept( dataDistributionQueue( cx, output, input.getFuture(), getShardMetrics, processingUnhealthy, tcis, shardsAffectedByTeamFailure, lock, getAverageShardBytes, self->ddId, storageTeamSize, configuration.storageTeamSize, &lastLimited ), "DDQueue", self->ddId, &normalDDQueueErrors() ) );
vector<DDTeamCollection*> teamCollectionsPtrs;
Reference<DDTeamCollection> primaryTeamCollection( new DDTeamCollection(cx, self->ddId, lock, output, shardsAffectedByTeamFailure, configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy) );
primaryTeamCollection = Reference<DDTeamCollection>( new DDTeamCollection(cx, self->ddId, lock, output, shardsAffectedByTeamFailure, configuration, primaryDcId, configuration.usableRegions > 1 ? remoteDcIds : std::vector<Optional<Key>>(), readyToStart.getFuture(), zeroHealthyTeams[0], true, processingUnhealthy) );
teamCollectionsPtrs.push_back(primaryTeamCollection.getPtr());
if (configuration.usableRegions > 1) {
Reference<DDTeamCollection> remoteTeamCollection( new DDTeamCollection(cx, self->ddId, lock, output, shardsAffectedByTeamFailure, configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), readyToStart.getFuture() && remoteRecovered(self->dbInfo), zeroHealthyTeams[1], false, processingUnhealthy) );
remoteTeamCollection = Reference<DDTeamCollection>( new DDTeamCollection(cx, self->ddId, lock, output, shardsAffectedByTeamFailure, configuration, remoteDcIds, Optional<std::vector<Optional<Key>>>(), readyToStart.getFuture() && remoteRecovered(self->dbInfo), zeroHealthyTeams[1], false, processingUnhealthy) );
teamCollectionsPtrs.push_back(remoteTeamCollection.getPtr());
remoteTeamCollection->teamCollections = teamCollectionsPtrs;
actors.push_back( reportErrorsExcept( dataDistributionTeamCollection( remoteTeamCollection, initData, tcis[1], self->dbInfo ), "DDTeamCollectionSecondary", self->ddId, &normalDDQueueErrors() ) );
@ -4626,6 +4628,8 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self)
catch( Error &e ) {
state Error err = e;
self->teamCollection = nullptr;
primaryTeamCollection = Reference<DDTeamCollection>();
remoteTeamCollection = Reference<DDTeamCollection>();
if( e.code() != error_code_movekeys_conflict )
throw err;
bool ddEnabled = wait( isDataDistributionEnabled(cx) );

View File

@ -1223,6 +1223,7 @@ ACTOR Future<Void> BgDDMountainChopper( DDQueueData* self, int teamCollectionInd
state double lastRead = 0;
state bool skipCurrentLoop = false;
loop {
state Optional<Reference<IDataDistributionTeam>> randomTeam;
state bool moved = false;
state TraceEvent traceEvent("BgDDMountainChopper", self->distributorId);
traceEvent.suppressFor(5.0)
@ -1258,15 +1259,15 @@ ACTOR Future<Void> BgDDMountainChopper( DDQueueData* self, int teamCollectionInd
traceEvent.detail("QueuedRelocations", self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM]);
if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_OVERUTILIZED_TEAM] <
SERVER_KNOBS->DD_REBALANCE_PARALLELISM) {
state Optional<Reference<IDataDistributionTeam>> randomTeam = wait(brokenPromiseToNever(
Optional<Reference<IDataDistributionTeam>> _randomTeam = wait(brokenPromiseToNever(
self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, false, true, false))));
randomTeam = _randomTeam;
traceEvent.detail("DestTeam", printable(randomTeam.map<std::string>([](const Reference<IDataDistributionTeam>& team){
return team->getDesc();
})));
if (randomTeam.present()) {
state Optional<Reference<IDataDistributionTeam>> loadedTeam =
Optional<Reference<IDataDistributionTeam>> loadedTeam =
wait(brokenPromiseToNever(self->teamCollections[teamCollectionIndex].getTeam.getReply(
GetTeamRequest(true, true, false, true))));
@ -1320,7 +1321,9 @@ ACTOR Future<Void> BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex)
state Transaction tr(self->cx);
state double lastRead = 0;
state bool skipCurrentLoop = false;
loop {
state Optional<Reference<IDataDistributionTeam>> randomTeam;
state bool moved = false;
state TraceEvent traceEvent("BgDDValleyFiller", self->distributorId);
traceEvent.suppressFor(5.0)
@ -1356,15 +1359,15 @@ ACTOR Future<Void> BgDDValleyFiller( DDQueueData* self, int teamCollectionIndex)
traceEvent.detail("QueuedRelocations", self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM]);
if (self->priority_relocations[SERVER_KNOBS->PRIORITY_REBALANCE_UNDERUTILIZED_TEAM] <
SERVER_KNOBS->DD_REBALANCE_PARALLELISM) {
state Optional<Reference<IDataDistributionTeam>> randomTeam = wait(brokenPromiseToNever(
Optional<Reference<IDataDistributionTeam>> _randomTeam = wait(brokenPromiseToNever(
self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, false, false, true))));
randomTeam = _randomTeam;
traceEvent.detail("SourceTeam", printable(randomTeam.map<std::string>([](const Reference<IDataDistributionTeam>& team){
return team->getDesc();
})));
if (randomTeam.present()) {
state Optional<Reference<IDataDistributionTeam>> unloadedTeam = wait(brokenPromiseToNever(
Optional<Reference<IDataDistributionTeam>> unloadedTeam = wait(brokenPromiseToNever(
self->teamCollections[teamCollectionIndex].getTeam.getReply(GetTeamRequest(true, true, true, false))));
traceEvent.detail("DestTeam", printable(unloadedTeam.map<std::string>([](const Reference<IDataDistributionTeam>& team){