Backport changes to OldTLogServer_6_2

This commit is contained in:
Jingyu Zhou 2019-10-14 14:34:17 -07:00
parent f21d7ca44c
commit 56f40a978e
1 changed files with 24 additions and 31 deletions

View File

@ -1121,7 +1121,7 @@ void commitMessages( TLogData* self, Reference<LogData> logData, Version version
for(auto& msg : taggedMessages) {
if(msg.message.size() > block.capacity() - block.size()) {
logData->messageBlocks.push_back( std::make_pair(version, block) );
logData->messageBlocks.emplace_back(version, block);
addedBytes += int64_t(block.size()) * SERVER_KNOBS->TLOG_MESSAGE_BLOCK_OVERHEAD_FACTOR;
block = Standalone<VectorRef<uint8_t>>();
block.reserve(block.arena(), std::max<int64_t>(SERVER_KNOBS->TLOG_MESSAGE_BLOCK_BYTES, msgSize));
@ -1156,7 +1156,7 @@ void commitMessages( TLogData* self, Reference<LogData> logData, Version version
}
if (version >= tagData->popped) {
tagData->versionMessages.push_back(std::make_pair(version, LengthPrefixedStringRef((uint32_t*)(block.end() - msg.message.size()))));
tagData->versionMessages.emplace_back(version, LengthPrefixedStringRef((uint32_t*)(block.end() - msg.message.size())));
if(tagData->versionMessages.back().second.expectedSize() > SERVER_KNOBS->MAX_MESSAGE_SIZE) {
TraceEvent(SevWarnAlways, "LargeMessage").detail("Size", tagData->versionMessages.back().second.expectedSize());
}
@ -1176,7 +1176,7 @@ void commitMessages( TLogData* self, Reference<LogData> logData, Version version
msgSize -= msg.message.size();
}
logData->messageBlocks.push_back( std::make_pair(version, block) );
logData->messageBlocks.emplace_back(version, block);
addedBytes += int64_t(block.size()) * SERVER_KNOBS->TLOG_MESSAGE_BLOCK_OVERHEAD_FACTOR;
addedBytes += overheadBytes;
@ -1236,9 +1236,14 @@ ACTOR Future<Void> tLogPopCore( TLogData* self, Tag inputTag, Version to, Refere
}
state Version upTo = to;
int8_t tagLocality = inputTag.locality;
if (logData->logSystem->get().isValid() && logData->logSystem->get()->isPseudoLocality(tagLocality)) {
upTo = logData->logSystem->get()->popPseudoLocalityTag(tagLocality, to);
tagLocality = tagLocalityLogRouter;
if (isPseudoLocality(tagLocality)) {
if (logData->logSystem->get().isValid()) {
upTo = logData->logSystem->get()->popPseudoLocalityTag(inputTag, to);
tagLocality = tagLocalityLogRouter;
} else {
TraceEvent("TLogPopNoLogSystem", self->dbgid).detail("Locality", tagLocality).detail("Version", upTo);
return Void();
}
}
state Tag tag(tagLocality, inputTag.id);
auto tagData = logData->getTagData(tag);
@ -1516,7 +1521,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
if (sd.version >= req.begin) {
firstVersion = std::min(firstVersion, sd.version);
const IDiskQueue::location end = sd.start.lo + sd.length;
commitLocations.push_back( std::make_pair(sd.start, end) );
commitLocations.emplace_back(sd.start, end);
// This isn't perfect, because we aren't accounting for page boundaries, but should be
// close enough.
commitBytes += sd.length;
@ -1850,28 +1855,16 @@ ACTOR Future<Void> rejoinMasters( TLogData* self, TLogInterface tli, DBRecoveryC
} else {
isDisplaced = isDisplaced && ( ( inf.recoveryCount > recoveryCount && inf.recoveryState != RecoveryState::UNINITIALIZED ) || ( inf.recoveryCount == recoveryCount && inf.recoveryState == RecoveryState::FULLY_RECOVERED ) );
}
if(isDisplaced) {
for(auto& log : inf.logSystemConfig.tLogs) {
if( std::count( log.tLogs.begin(), log.tLogs.end(), tli.id() ) ) {
isDisplaced = false;
break;
}
}
}
if(isDisplaced) {
for(auto& old : inf.logSystemConfig.oldTLogs) {
for(auto& log : old.tLogs) {
if( std::count( log.tLogs.begin(), log.tLogs.end(), tli.id() ) ) {
isDisplaced = false;
break;
}
}
}
}
if ( isDisplaced )
{
TraceEvent("TLogDisplaced", tli.id()).detail("Reason", "DBInfoDoesNotContain").detail("RecoveryCount", recoveryCount).detail("InfRecoveryCount", inf.recoveryCount).detail("RecoveryState", (int)inf.recoveryState)
.detail("LogSysConf", describe(inf.logSystemConfig.tLogs)).detail("PriorLogs", describe(inf.priorCommittedLogServers)).detail("OldLogGens", inf.logSystemConfig.oldTLogs.size());
isDisplaced = isDisplaced && !inf.logSystemConfig.hasTLog(tli.id());
if (isDisplaced) {
TraceEvent("TLogDisplaced", tli.id())
.detail("Reason", "DBInfoDoesNotContain")
.detail("RecoveryCount", recoveryCount)
.detail("InfRecoveryCount", inf.recoveryCount)
.detail("RecoveryState", (int)inf.recoveryState)
.detail("LogSysConf", describe(inf.logSystemConfig.tLogs))
.detail("PriorLogs", describe(inf.priorCommittedLogServers))
.detail("OldLogGens", inf.logSystemConfig.oldTLogs.size());
if (BUGGIFY) wait( delay( SERVER_KNOBS->BUGGIFY_WORKER_REMOVED_MAX_LAG * deterministicRandom()->random01() ) );
throw worker_removed();
}
@ -2258,7 +2251,7 @@ ACTOR Future<Void> pullAsyncData( TLogData* self, Reference<LogData> logData, st
}
}
messages.push_back( TagsAndMessage(r->getMessageWithTags(), r->getTags()) );
messages.emplace_back(r->getMessageWithTags(), r->getTags());
r->nextMessage();
}
@ -2449,7 +2442,7 @@ ACTOR Future<Void> restorePersistentState( TLogData* self, LocalityData locality
logData->recoveryCount = BinaryReader::fromStringRef<DBRecoveryCount>( fRecoverCounts.get()[idx].value, Unversioned() );
logData->removed = rejoinMasters(self, recruited, logData->recoveryCount, registerWithMaster.getFuture(), false);
removed.push_back(errorOr(logData->removed));
logsByVersion.push_back(std::make_pair(ver, id1));
logsByVersion.emplace_back(ver, id1);
TraceEvent("TLogPersistentStateRestore", self->dbgid).detail("LogId", logData->logId).detail("Ver", ver);
// Restore popped keys. Pop operations that took place after the last (committed) updatePersistentDataVersion might be lost, but