Merge pull request #2099 from etschannen/release-6.2
Data distribution could create all possible teams
This commit is contained in:
commit
82d82ad1ab
|
@ -1810,7 +1810,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
// The desired machine team number is not the same with the desired server team number
|
||||
// in notEnoughTeamsForAServer() below, because the machineTeamRemover() does not
|
||||
// remove a machine team with the most number of machine teams.
|
||||
if (m.second->machineTeams.size() < targetMachineTeamNumPerMachine) {
|
||||
if (m.second->machineTeams.size() < targetMachineTeamNumPerMachine && isMachineHealthy(m.second)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -1831,7 +1831,7 @@ struct DDTeamCollection : ReferenceCounted<DDTeamCollection> {
|
|||
int targetTeamNumPerServer = (SERVER_KNOBS->DESIRED_TEAMS_PER_SERVER * (configuration.storageTeamSize + 1)) / 2;
|
||||
ASSERT(targetTeamNumPerServer > 0);
|
||||
for (auto& s : server_info) {
|
||||
if (s.second->teams.size() < targetTeamNumPerServer) {
|
||||
if (s.second->teams.size() < targetTeamNumPerServer && !server_status.get(s.first).isUnhealthy()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,9 +51,9 @@ struct TLogInterface {
|
|||
|
||||
|
||||
TLogInterface() {}
|
||||
explicit TLogInterface(LocalityData locality) : uniqueID( deterministicRandom()->randomUniqueID() ), locality(locality) { sharedTLogID = uniqueID; }
|
||||
TLogInterface(UID sharedTLogID, LocalityData locality) : uniqueID( deterministicRandom()->randomUniqueID() ), sharedTLogID(sharedTLogID), locality(locality) {}
|
||||
TLogInterface(UID uniqueID, UID sharedTLogID, LocalityData locality) : uniqueID(uniqueID), sharedTLogID(sharedTLogID), locality(locality) {}
|
||||
explicit TLogInterface(const LocalityData& locality) : uniqueID( deterministicRandom()->randomUniqueID() ), locality(locality) { sharedTLogID = uniqueID; }
|
||||
TLogInterface(UID sharedTLogID, const LocalityData& locality) : uniqueID( deterministicRandom()->randomUniqueID() ), sharedTLogID(sharedTLogID), locality(locality) {}
|
||||
TLogInterface(UID uniqueID, UID sharedTLogID, const LocalityData& locality) : uniqueID(uniqueID), sharedTLogID(sharedTLogID), locality(locality) {}
|
||||
UID id() const { return uniqueID; }
|
||||
UID getSharedTLogID() const { return sharedTLogID; }
|
||||
std::string toString() const { return id().shortString(); }
|
||||
|
|
|
@ -434,6 +434,8 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
Version queueCommittingVersion;
|
||||
Version knownCommittedVersion, durableKnownCommittedVersion, minKnownCommittedVersion;
|
||||
Version queuePoppedVersion;
|
||||
Version minPoppedTagVersion;
|
||||
Tag minPoppedTag;
|
||||
|
||||
Deque<std::pair<Version, Standalone<VectorRef<uint8_t>>>> messageBlocks;
|
||||
std::vector<std::vector<Reference<TagData>>> tag_data; //tag.locality | tag.id
|
||||
|
@ -498,6 +500,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, int txsTags, UID recruitmentID, ProtocolVersion protocolVersion, std::vector<Tag> tags) : tLogData(tLogData), knownCommittedVersion(0), logId(interf.id()),
|
||||
cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), txsTags(txsTags), recruitmentID(recruitmentID), protocolVersion(protocolVersion),
|
||||
logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0), queuePoppedVersion(0), allTags(tags.begin(), tags.end()), terminated(tLogData->terminated.getFuture()),
|
||||
minPoppedTagVersion(0), minPoppedTag(invalidTag),
|
||||
// These are initialized differently on init() or recovery
|
||||
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(1), recoveredAt(1), unpoppedRecoveredTags(0),
|
||||
logRouterPopToVersion(0), locality(tagLocalityInvalid), execOpCommitInProgress(false)
|
||||
|
@ -515,6 +518,8 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
|
|||
specialCounter(cc, "PersistentDataDurableVersion", [this](){ return this->persistentDataDurableVersion; });
|
||||
specialCounter(cc, "KnownCommittedVersion", [this](){ return this->knownCommittedVersion; });
|
||||
specialCounter(cc, "QueuePoppedVersion", [this](){ return this->queuePoppedVersion; });
|
||||
specialCounter(cc, "MinPoppedTagVersion", [this](){ return this->minPoppedTagVersion; });
|
||||
specialCounter(cc, "MinPoppedTag", [this](){ return this->minPoppedTag.toString(); });
|
||||
specialCounter(cc, "SharedBytesInput", [tLogData](){ return tLogData->bytesInput; });
|
||||
specialCounter(cc, "SharedBytesDurable", [tLogData](){ return tLogData->bytesDurable; });
|
||||
specialCounter(cc, "SharedOverheadBytesInput", [tLogData](){ return tLogData->overheadBytesInput; });
|
||||
|
@ -733,12 +738,20 @@ ACTOR Future<Void> popDiskQueue( TLogData* self, Reference<LogData> logData ) {
|
|||
minLocation = locationIter->value.first;
|
||||
minVersion = locationIter->key;
|
||||
}
|
||||
logData->minPoppedTagVersion = std::numeric_limits<Version>::max();
|
||||
|
||||
for(int tagLocality = 0; tagLocality < logData->tag_data.size(); tagLocality++) {
|
||||
for(int tagId = 0; tagId < logData->tag_data[tagLocality].size(); tagId++) {
|
||||
Reference<LogData::TagData> tagData = logData->tag_data[tagLocality][tagId];
|
||||
if (tagData && tagData->tag.locality != tagLocalityTxs && tagData->tag != txsTag && !tagData->nothingPersistent) {
|
||||
minLocation = std::min(minLocation, tagData->poppedLocation);
|
||||
minVersion = std::min(minVersion, tagData->popped);
|
||||
if (tagData && tagData->tag.locality != tagLocalityTxs && tagData->tag != txsTag) {
|
||||
if(!tagData->nothingPersistent) {
|
||||
minLocation = std::min(minLocation, tagData->poppedLocation);
|
||||
minVersion = std::min(minVersion, tagData->popped);
|
||||
}
|
||||
if((!tagData->nothingPersistent || tagData->versionMessages.size()) && tagData->popped < logData->minPoppedTagVersion) {
|
||||
logData->minPoppedTagVersion = tagData->popped;
|
||||
logData->minPoppedTag = tagData->tag;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue