Merge pull request #3530 from xumengpanda/mengxu/ssfailure-fix-PR
Fix wild pointer to destructed DD that causes DD crash rarely
This commit is contained in:
commit
b0983f4a9a
|
@ -667,21 +667,49 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
}
|
||||
|
||||
~DDTeamCollection() {
|
||||
// The following kills a reference cycle between the teamTracker actor and the TCTeamInfo that both holds and is held by the actor
|
||||
// It also ensures that the trackers are done fiddling with healthyTeamCount before we free this
|
||||
TraceEvent("DDTeamCollectionDestructed", distributorId).detail("Primary", primary);
|
||||
// Other teamCollections also hold pointer to this teamCollection;
|
||||
// TeamTracker may access the destructed DDTeamCollection if we do not reset the pointer
|
||||
for (int i = 0; i < teamCollections.size(); i++) {
|
||||
if (teamCollections[i] != nullptr && teamCollections[i] != this) {
|
||||
for (int j = 0; j < teamCollections[i]->teamCollections.size(); ++j) {
|
||||
if (teamCollections[i]->teamCollections[j] == this) {
|
||||
teamCollections[i]->teamCollections[j] = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Team tracker has pointers to DDTeamCollections both in primary and remote.
|
||||
// The following kills a reference cycle between the teamTracker actor and the TCTeamInfo that both holds and is
|
||||
// held by the actor It also ensures that the trackers are done fiddling with healthyTeamCount before we free
|
||||
// this
|
||||
for(int i=0; i < teams.size(); i++) {
|
||||
teams[i]->tracker.cancel();
|
||||
}
|
||||
// The commented TraceEvent log is useful in detecting what is running during the destruction
|
||||
// TraceEvent("DDTeamCollectionDestructed", distributorId)
|
||||
// .detail("Primary", primary)
|
||||
// .detail("TeamTrackerDestroyed", teams.size());
|
||||
for(int i=0; i < badTeams.size(); i++) {
|
||||
badTeams[i]->tracker.cancel();
|
||||
}
|
||||
// The following makes sure that, even if a reference to a team is held in the DD Queue, the tracker will be stopped
|
||||
// TraceEvent("DDTeamCollectionDestructed", distributorId)
|
||||
// .detail("Primary", primary)
|
||||
// .detail("BadTeamTrackerDestroyed", badTeams.size());
|
||||
// The following makes sure that, even if a reference to a team is held in the DD Queue, the tracker will be
|
||||
// stopped
|
||||
// before the server_status map to which it has a pointer, is destroyed.
|
||||
for(auto it = server_info.begin(); it != server_info.end(); ++it) {
|
||||
it->second->tracker.cancel();
|
||||
it->second->collection = nullptr;
|
||||
}
|
||||
// TraceEvent("DDTeamCollectionDestructed", distributorId)
|
||||
// .detail("Primary", primary)
|
||||
// .detail("ServerTrackerDestroyed", server_info.size());
|
||||
teamBuilder.cancel();
|
||||
// TraceEvent("DDTeamCollectionDestructed", distributorId)
|
||||
// .detail("Primary", primary)
|
||||
// .detail("TeamBuilderDestroyed", server_info.size());
|
||||
}
|
||||
|
||||
void addLaggingStorageServer(Key zoneId) {
|
||||
|
@ -1424,6 +1452,12 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
void traceAllInfo(bool shouldPrint = false) {
|
||||
|
||||
if (!shouldPrint) return;
|
||||
// Record all team collections IDs
|
||||
for (int i = 0; i < teamCollections.size(); ++i) {
|
||||
TraceEvent("TraceAllInfo", distributorId)
|
||||
.detail("TeamCollectionIndex", i)
|
||||
.detail("Primary", teamCollections[i]->primary);
|
||||
}
|
||||
|
||||
TraceEvent("TraceAllInfo", distributorId).detail("Primary", primary);
|
||||
traceConfigInfo();
|
||||
|
@ -3127,7 +3161,9 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
|
|||
// The shard split/merge and DD rebooting may make a shard mapped to multiple teams,
|
||||
// so we need to recalculate the shard's priority
|
||||
if (maxPriority < SERVER_KNOBS->PRIORITY_TEAM_FAILED) {
|
||||
auto teams = self->shardsAffectedByTeamFailure->getTeamsFor( shards[i] );
|
||||
std::pair<vector<ShardsAffectedByTeamFailure::Team>,
|
||||
vector<ShardsAffectedByTeamFailure::Team>>
|
||||
teams = self->shardsAffectedByTeamFailure->getTeamsFor(shards[i]);
|
||||
for( int j=0; j < teams.first.size()+teams.second.size(); j++) {
|
||||
// t is the team in primary DC or the remote DC
|
||||
auto& t = j < teams.first.size() ? teams.first[j] : teams.second[j-teams.first.size()];
|
||||
|
@ -3137,7 +3173,16 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
|
|||
}
|
||||
|
||||
auto tc = self->teamCollections[t.primary ? 0 : 1];
|
||||
if (tc == nullptr) {
|
||||
// teamTracker only works when all teamCollections are valid.
|
||||
// Always check if all teamCollections are valid, and throw error if any
|
||||
// teamCollection has been destructed, because the teamTracker can be triggered
|
||||
// after a DDTeamCollection was destroyed and before the other DDTeamCollection is
|
||||
// destroyed. Do not throw actor_cancelled() because flow treat it differently.
|
||||
throw dd_cancelled();
|
||||
}
|
||||
ASSERT(tc->primary == t.primary);
|
||||
// tc->traceAllInfo();
|
||||
if( tc->server_info.count( t.servers[0] ) ) {
|
||||
auto& info = tc->server_info[t.servers[0]];
|
||||
|
||||
|
@ -3146,6 +3191,7 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
|
|||
if( info->teams[k]->getServerIDs() == t.servers ) {
|
||||
maxPriority = std::max( maxPriority, info->teams[k]->getPriority() );
|
||||
found = true;
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -3170,15 +3216,15 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
|
|||
rs.priority = maxPriority;
|
||||
|
||||
self->output.send(rs);
|
||||
if(deterministicRandom()->random01() < 0.01) {
|
||||
TraceEvent("SendRelocateToDDQx100", self->distributorId)
|
||||
.detail("Team", team->getDesc())
|
||||
.detail("KeyBegin", rs.keys.begin)
|
||||
.detail("KeyEnd", rs.keys.end)
|
||||
.detail("Priority", rs.priority)
|
||||
.detail("TeamFailedMachines", team->size() - serversLeft)
|
||||
.detail("TeamOKMachines", serversLeft);
|
||||
}
|
||||
TraceEvent("SendRelocateToDDQueue", self->distributorId)
|
||||
.suppressFor(1.0)
|
||||
.detail("Primary", self->primary)
|
||||
.detail("Team", team->getDesc())
|
||||
.detail("KeyBegin", rs.keys.begin)
|
||||
.detail("KeyEnd", rs.keys.end)
|
||||
.detail("Priority", rs.priority)
|
||||
.detail("TeamFailedMachines", team->size() - serversLeft)
|
||||
.detail("TeamOKMachines", serversLeft);
|
||||
}
|
||||
} else {
|
||||
if(logTeamEvents) {
|
||||
|
@ -3193,7 +3239,10 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
|
|||
}
|
||||
} catch(Error& e) {
|
||||
if(logTeamEvents) {
|
||||
TraceEvent("TeamTrackerStopping", self->distributorId).detail("Team", team->getDesc()).detail("Priority", team->getPriority());
|
||||
TraceEvent("TeamTrackerStopping", self->distributorId)
|
||||
.detail("Primary", self->primary)
|
||||
.detail("Team", team->getDesc())
|
||||
.detail("Priority", team->getPriority());
|
||||
}
|
||||
self->priority_teams[team->getPriority()]--;
|
||||
if (team->isHealthy()) {
|
||||
|
@ -3201,7 +3250,9 @@ ACTOR Future<Void> teamTracker(DDTeamCollection* self, Reference<TCTeamInfo> tea
|
|||
ASSERT( self->healthyTeamCount >= 0 );
|
||||
|
||||
if( self->healthyTeamCount == 0 ) {
|
||||
TraceEvent(SevWarn, "ZeroTeamsHealthySignalling", self->distributorId).detail("SignallingTeam", team->getDesc());
|
||||
TraceEvent(SevWarn, "ZeroTeamsHealthySignalling", self->distributorId)
|
||||
.detail("Primary", self->primary)
|
||||
.detail("SignallingTeam", team->getDesc());
|
||||
self->zeroHealthyTeams->set(true);
|
||||
}
|
||||
}
|
||||
|
@ -3852,6 +3903,10 @@ ACTOR Future<Void> storageServerTracker(
|
|||
} catch( Error &e ) {
|
||||
if (e.code() != error_code_actor_cancelled && errorOut.canBeSet())
|
||||
errorOut.sendError(e);
|
||||
TraceEvent("StorageServerTrackerCancelled", self->distributorId)
|
||||
.suppressFor(1.0)
|
||||
.detail("Primary", self->primary)
|
||||
.detail("Server", server->id);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
@ -4637,6 +4692,7 @@ ACTOR Future<Void> dataDistribution(Reference<DataDistributorData> self, Promise
|
|||
}
|
||||
catch( Error &e ) {
|
||||
state Error err = e;
|
||||
TraceEvent("DataDistributorDestroyTeamCollections").error(e);
|
||||
self->teamCollection = nullptr;
|
||||
primaryTeamCollection = Reference<DDTeamCollection>();
|
||||
remoteTeamCollection = Reference<DDTeamCollection>();
|
||||
|
|
|
@ -30,6 +30,7 @@
|
|||
|
||||
// 1xxx Normal failure (plausibly these should not even be "errors", but they are failures of
|
||||
// the way operations are currently defined)
|
||||
// clang-format off
|
||||
ERROR( success, 0, "Success" )
|
||||
ERROR( end_of_stream, 1, "End of stream" )
|
||||
ERROR( operation_failed, 1000, "Operation failed")
|
||||
|
@ -70,6 +71,7 @@ ERROR( connection_unreferenced, 1048, "No peer references for connection" )
|
|||
ERROR( connection_idle, 1049, "Connection closed after idle timeout" )
|
||||
ERROR( disk_adapter_reset, 1050, "The disk queue adpater reset" )
|
||||
ERROR( batch_transaction_throttled, 1051, "Batch GRV request rate limit exceeded")
|
||||
ERROR( dd_cancelled, 1052, "Data distribution components cancelled")
|
||||
|
||||
ERROR( broken_promise, 1100, "Broken promise" )
|
||||
ERROR( operation_cancelled, 1101, "Asynchronous operation cancelled" )
|
||||
|
@ -227,6 +229,7 @@ ERROR( snap_with_recovery_unsupported, 2508, "Cluster recovery during snapshot o
|
|||
// 4xxx Internal errors (those that should be generated only by bugs) are decimal 4xxx
|
||||
ERROR( unknown_error, 4000, "An unknown error occurred" ) // C++ exception not of type Error
|
||||
ERROR( internal_error, 4100, "An internal error occurred" )
|
||||
// clang-format on
|
||||
|
||||
#undef ERROR
|
||||
#endif
|
||||
|
|
|
@ -408,7 +408,7 @@ struct SingleCallback {
|
|||
}
|
||||
};
|
||||
|
||||
// SAV is short for Single Assigment Variable: It can be assigned for only once!
|
||||
// SAV is short for Single Assignment Variable: It can be assigned for only once!
|
||||
template <class T>
|
||||
struct SAV : private Callback<T>, FastAllocated<SAV<T>> {
|
||||
int promises; // one for each promise (and one for an active actor if this is an actor)
|
||||
|
@ -794,7 +794,9 @@ public:
|
|||
bool canBeSet() { return sav->canBeSet(); }
|
||||
bool isValid() const { return sav != NULL; }
|
||||
Promise() : sav(new SAV<T>(0, 1)) {}
|
||||
Promise(const Promise& rhs) : sav(rhs.sav) { sav->addPromiseRef(); }
|
||||
Promise(const Promise& rhs) : sav(rhs.sav) {
|
||||
sav->addPromiseRef();
|
||||
}
|
||||
Promise(Promise&& rhs) BOOST_NOEXCEPT : sav(rhs.sav) { rhs.sav = 0; }
|
||||
~Promise() { if (sav) sav->delPromiseRef(); }
|
||||
|
||||
|
|
|
@ -613,11 +613,11 @@ protected:
|
|||
Promise<Void> noDestroy = trigger; // The send(Void()) or even V::operator= could cause destroyOnCancel,
|
||||
// which could undo the change to i.value here. Keeping the promise reference count >= 2
|
||||
// prevents destroyOnCancel from erasing anything from the map.
|
||||
|
||||
if (v == defaultValue)
|
||||
if (v == defaultValue) {
|
||||
items.erase(k);
|
||||
else
|
||||
} else {
|
||||
i.value = v;
|
||||
}
|
||||
|
||||
trigger.send(Void());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue