diff --git a/fdbserver/DataDistribution.actor.cpp b/fdbserver/DataDistribution.actor.cpp index 73891b11f1..001afcbb99 100644 --- a/fdbserver/DataDistribution.actor.cpp +++ b/fdbserver/DataDistribution.actor.cpp @@ -3175,8 +3175,21 @@ ACTOR Future serverMetricsPolling( TCServerInfo *server) { } } -//Returns the KeyValueStoreType of server if it is different from self->storeType -ACTOR Future keyValueStoreTypeTracker(DDTeamCollection* self, TCServerInfo *server) { +//Returns if the KeyValueStoreType of server is different from self->storeType or the desired datacenter does not match +ACTOR Future keyValueStoreTypeTracker(DDTeamCollection* self, TCServerInfo *server) { + if ((!self->includedDCs.empty() && + std::find(self->includedDCs.begin(), self->includedDCs.end(), server->lastKnownInterface.locality.dcId()) == + self->includedDCs.end()) || + (!self->isValidLocality(self->configuration.storagePolicy, server->lastKnownInterface.locality))) { + TraceEvent("KeyValueStoreTypeChanged", self->distributorId) + .detail("ServerID", server->id) + .detail("StoreType", "?") + .detail("DesiredType", self->configuration.storageServerStoreType.toString()) + .detail("IsValidLocality", self->isValidLocality(self->configuration.storagePolicy, + server->lastKnownInterface.locality)); + return Void(); + } + state KeyValueStoreType type = wait(brokenPromiseToNever(server->lastKnownInterface.getKeyValueStoreType.getReplyWithTaskID(TaskPriority::DataDistribution))); if (type == self->configuration.storageServerStoreType && (self->includedDCs.empty() || @@ -3186,7 +3199,14 @@ ACTOR Future keyValueStoreTypeTracker(DDTeamCollection* self, wait(Future(Never())); } - return type; + TraceEvent("KeyValueStoreTypeChanged", self->distributorId) + .detail("ServerID", server->id) + .detail("StoreType", type.toString()) + .detail("DesiredType", self->configuration.storageServerStoreType.toString()) + .detail("IsValidLocality", self->isValidLocality(self->configuration.storagePolicy, + server->lastKnownInterface.locality)); + + return Void(); } ACTOR Future waitForAllDataRemoved( Database cx, UID serverID, Version addedVersion, DDTeamCollection* teams ) { @@ -3302,7 +3322,7 @@ ACTOR Future storageServerTracker( state Future metricsTracker = serverMetricsPolling( server ); state Future> interfaceChanged = server->onInterfaceChanged; - state Future storeTracker = keyValueStoreTypeTracker( self, server ); + state Future storeTracker = keyValueStoreTypeTracker( self, server ); state bool hasWrongStoreTypeOrDC = false; state int targetTeamNumPerServer = (SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (self->configuration.storageTeamSize + 1)) / 2; @@ -3527,13 +3547,7 @@ ACTOR Future storageServerTracker( when( wait( otherChanges.empty() ? Never() : quorum( otherChanges, 1 ) ) ) { TraceEvent("SameAddressChangedStatus", self->distributorId).detail("ServerID", server->id); } - when( KeyValueStoreType type = wait( storeTracker ) ) { - TraceEvent("KeyValueStoreTypeChanged", self->distributorId) - .detail("ServerID", server->id) - .detail("StoreType", type.toString()) - .detail("DesiredType", self->configuration.storageServerStoreType.toString()) - .detail("IsValidLocality", self->isValidLocality(self->configuration.storagePolicy, - server->lastKnownInterface.locality)); + when( wait( storeTracker ) ) { TEST(true); //KeyValueStore type changed storeTracker = Never(); diff --git a/fdbserver/LogSystemPeekCursor.actor.cpp b/fdbserver/LogSystemPeekCursor.actor.cpp index 98ba5a4bb0..4c4409c0c0 100644 --- a/fdbserver/LogSystemPeekCursor.actor.cpp +++ b/fdbserver/LogSystemPeekCursor.actor.cpp @@ -991,8 +991,16 @@ void ILogSystem::BufferedCursor::advanceTo(LogMessageVersion n) { } ACTOR Future bufferedGetMoreLoader( ILogSystem::BufferedCursor* self, Reference cursor, Version maxVersion, TaskPriority taskID ) { + if(cursor->version().version >= maxVersion) { + return Void(); + } loop { wait(yield()); + wait(cursor->getMore(taskID)); + self->poppedVersion = std::max(self->poppedVersion, cursor->popped()); + if(self->canDiscardPopped) { + self->initialPoppedVersion = std::max(self->initialPoppedVersion, cursor->popped()); + } if(cursor->version().version >= maxVersion) { return Void(); } @@ -1003,11 +1011,6 @@ ACTOR Future bufferedGetMoreLoader( ILogSystem::BufferedCursor* self, Refe return Void(); } } - wait(cursor->getMore(taskID)); - self->poppedVersion = std::max(self->poppedVersion, cursor->popped()); - if(self->canDiscardPopped) { - self->initialPoppedVersion = std::max(self->initialPoppedVersion, cursor->popped()); - } } }