Merge pull request #2286 from etschannen/feature-immediate-undesired

Two bug fixes
This commit is contained in:
Evan Tschannen 2019-10-24 10:02:44 -07:00 committed by GitHub
commit a2bc4173a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 16 deletions

View File

@ -3175,8 +3175,21 @@ ACTOR Future<Void> serverMetricsPolling( TCServerInfo *server) {
}
}
//Returns the KeyValueStoreType of server if it is different from self->storeType
ACTOR Future<KeyValueStoreType> keyValueStoreTypeTracker(DDTeamCollection* self, TCServerInfo *server) {
//Returns if the KeyValueStoreType of server is different from self->storeType or the desired datacenter does not match
ACTOR Future<Void> keyValueStoreTypeTracker(DDTeamCollection* self, TCServerInfo *server) {
if ((!self->includedDCs.empty() &&
std::find(self->includedDCs.begin(), self->includedDCs.end(), server->lastKnownInterface.locality.dcId()) ==
self->includedDCs.end()) ||
(!self->isValidLocality(self->configuration.storagePolicy, server->lastKnownInterface.locality))) {
TraceEvent("KeyValueStoreTypeChanged", self->distributorId)
.detail("ServerID", server->id)
.detail("StoreType", "?")
.detail("DesiredType", self->configuration.storageServerStoreType.toString())
.detail("IsValidLocality", self->isValidLocality(self->configuration.storagePolicy,
server->lastKnownInterface.locality));
return Void();
}
state KeyValueStoreType type = wait(brokenPromiseToNever(server->lastKnownInterface.getKeyValueStoreType.getReplyWithTaskID<KeyValueStoreType>(TaskPriority::DataDistribution)));
if (type == self->configuration.storageServerStoreType &&
(self->includedDCs.empty() ||
@ -3186,7 +3199,14 @@ ACTOR Future<KeyValueStoreType> keyValueStoreTypeTracker(DDTeamCollection* self,
wait(Future<Void>(Never()));
}
return type;
TraceEvent("KeyValueStoreTypeChanged", self->distributorId)
.detail("ServerID", server->id)
.detail("StoreType", type.toString())
.detail("DesiredType", self->configuration.storageServerStoreType.toString())
.detail("IsValidLocality", self->isValidLocality(self->configuration.storagePolicy,
server->lastKnownInterface.locality));
return Void();
}
ACTOR Future<Void> waitForAllDataRemoved( Database cx, UID serverID, Version addedVersion, DDTeamCollection* teams ) {
@ -3302,7 +3322,7 @@ ACTOR Future<Void> storageServerTracker(
state Future<Void> metricsTracker = serverMetricsPolling( server );
state Future<std::pair<StorageServerInterface, ProcessClass>> interfaceChanged = server->onInterfaceChanged;
state Future<KeyValueStoreType> storeTracker = keyValueStoreTypeTracker( self, server );
state Future<Void> storeTracker = keyValueStoreTypeTracker( self, server );
state bool hasWrongStoreTypeOrDC = false;
state int targetTeamNumPerServer = (SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (self->configuration.storageTeamSize + 1)) / 2;
@ -3527,13 +3547,7 @@ ACTOR Future<Void> storageServerTracker(
when( wait( otherChanges.empty() ? Never() : quorum( otherChanges, 1 ) ) ) {
TraceEvent("SameAddressChangedStatus", self->distributorId).detail("ServerID", server->id);
}
when( KeyValueStoreType type = wait( storeTracker ) ) {
TraceEvent("KeyValueStoreTypeChanged", self->distributorId)
.detail("ServerID", server->id)
.detail("StoreType", type.toString())
.detail("DesiredType", self->configuration.storageServerStoreType.toString())
.detail("IsValidLocality", self->isValidLocality(self->configuration.storagePolicy,
server->lastKnownInterface.locality));
when( wait( storeTracker ) ) {
TEST(true); //KeyValueStore type changed
storeTracker = Never();

View File

@ -991,8 +991,16 @@ void ILogSystem::BufferedCursor::advanceTo(LogMessageVersion n) {
}
ACTOR Future<Void> bufferedGetMoreLoader( ILogSystem::BufferedCursor* self, Reference<ILogSystem::IPeekCursor> cursor, Version maxVersion, TaskPriority taskID ) {
if(cursor->version().version >= maxVersion) {
return Void();
}
loop {
wait(yield());
wait(cursor->getMore(taskID));
self->poppedVersion = std::max(self->poppedVersion, cursor->popped());
if(self->canDiscardPopped) {
self->initialPoppedVersion = std::max(self->initialPoppedVersion, cursor->popped());
}
if(cursor->version().version >= maxVersion) {
return Void();
}
@ -1003,11 +1011,6 @@ ACTOR Future<Void> bufferedGetMoreLoader( ILogSystem::BufferedCursor* self, Refe
return Void();
}
}
wait(cursor->getMore(taskID));
self->poppedVersion = std::max(self->poppedVersion, cursor->popped());
if(self->canDiscardPopped) {
self->initialPoppedVersion = std::max(self->initialPoppedVersion, cursor->popped());
}
}
}