fix simulation failures

This commit is contained in:
Dan Lambright 2021-11-30 09:04:12 -05:00
parent faef404279
commit 0222d8669d
9 changed files with 102 additions and 79 deletions

View File

@ -38,7 +38,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( MAX_COMMIT_BATCH_INTERVAL, 2.0 ); if( randomize && BUGGIFY ) MAX_COMMIT_BATCH_INTERVAL = 0.5; // Each commit proxy generates a CommitTransactionBatchRequest at least this often, so that versions always advance smoothly
MAX_COMMIT_BATCH_INTERVAL = std::min(MAX_COMMIT_BATCH_INTERVAL, MAX_READ_TRANSACTION_LIFE_VERSIONS/double(2*VERSIONS_PER_SECOND)); // Ensure that the proxy commits 2 times every MAX_READ_TRANSACTION_LIFE_VERSIONS, otherwise the master will not give out versions fast enough
init( ENABLE_VERSION_VECTOR, true );
init( ENABLE_VERSION_VECTOR_TLOG_UNICAST, false );
init( ENABLE_VERSION_VECTOR_TLOG_UNICAST, true );
// TLogs
init( TLOG_TIMEOUT, 0.4 ); //cannot buggify because of availability
@ -414,7 +414,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( TXN_STATE_SEND_AMOUNT, 4 );
init( REPORT_TRANSACTION_COST_ESTIMATION_DELAY, 0.1 );
init( PROXY_REJECT_BATCH_QUEUED_TOO_LONG, true );
init( PROXY_USE_RESOLVER_PRIVATE_MUTATIONS, true ); //if( randomize && BUGGIFY ) PROXY_USE_RESOLVER_PRIVATE_MUTATIONS = deterministicRandom()->coinflip();
init( PROXY_USE_RESOLVER_PRIVATE_MUTATIONS, true ); if( !ENABLE_VERSION_VECTOR_TLOG_UNICAST && randomize && BUGGIFY ) PROXY_USE_RESOLVER_PRIVATE_MUTATIONS = deterministicRandom()->coinflip();
init( RESET_MASTER_BATCHES, 200 );
init( RESET_RESOLVER_BATCHES, 200 );

View File

@ -980,6 +980,7 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
// TraceEvent("ResolverReturn").detail("ReturnTags",reply.writtenTags).detail("TPCVsize",reply.tpcvMap.size()).detail("ReqTags",self->writtenTagsPreResolution);
self->tpcvMap = reply.tpcvMap;
}
self->toCommit.addWrittenTags(reply.writtenTags);
}
self->lockedKey = pProxyCommitData->txnStateStore->readValue(databaseLockedKey).get();

View File

@ -770,6 +770,8 @@ struct LogPushData : NonCopyable {
}
}
void addWrittenTags(const std::set<Tag>& tags) { written_tags.insert(tags.begin(), tags.end()); }
void getLocations(const std::set<Tag>& tags, std::set<uint16_t>& writtenTLogs) {
std::vector<Tag> vtags(tags.begin(), tags.end());
std::vector<int> msg_locations;

View File

@ -286,10 +286,8 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self, ResolveTransactionBatc
reply.privateMutations.push_back(reply.arena, mutations);
reply.arena.dependsOn(mutations.arena());
}
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
// merge mutation tags with sent client tags
toCommit.saveTags(reply.writtenTags);
}
// merge mutation tags with sent client tags
toCommit.saveTags(reply.writtenTags);
reply.privateMutationCount = toCommit.getMutationCount();
}
@ -350,20 +348,24 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self, ResolveTransactionBatc
}
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
std::set<uint16_t> writtenTLogs;
if (reply.privateMutationCount) {
for (int i = 0; i < self->numLogs; i++) {
writtenTLogs.insert(i);
}
if (!self->numLogs) {
reply.tpcvMap.clear();
} else {
toCommit.getLocations(reply.writtenTags, writtenTLogs);
}
if (self->tpcvVector[0] == invalidVersion) {
std::fill(self->tpcvVector.begin(), self->tpcvVector.end(), req.prevVersion);
}
for (uint16_t tLog : writtenTLogs) {
reply.tpcvMap[tLog] = self->tpcvVector[tLog];
self->tpcvVector[tLog] = req.version;
std::set<uint16_t> writtenTLogs;
if (reply.privateMutationCount) {
for (int i = 0; i < self->numLogs; i++) {
writtenTLogs.insert(i);
}
} else {
toCommit.getLocations(reply.writtenTags, writtenTLogs);
}
if (self->tpcvVector[0] == invalidVersion) {
std::fill(self->tpcvVector.begin(), self->tpcvVector.end(), req.prevVersion);
}
for (uint16_t tLog : writtenTLogs) {
reply.tpcvMap[tLog] = self->tpcvVector[tLog];
self->tpcvVector[tLog] = req.version;
}
}
}
self->version.set(req.version);

View File

@ -3310,13 +3310,14 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality
if (recovering) {
logData->unrecoveredBefore = req.startVersion;
state Version recoverAt = req.recoverAt;
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
if (req.rvLogs.find(self->dbgid) != req.rvLogs.end()) {
req.recoverAt = req.rvLogs[self->dbgid];
// TraceEvent("TLogUnicastRecovered").detail("U", req.recoverAt);
recoverAt = req.rvLogs[self->dbgid];
// TraceEvent("TLogUnicastRecovered").detail("U", recoverAt);
}
}
logData->recoveredAt = req.recoverAt;
logData->recoveredAt = recoverAt;
logData->knownCommittedVersion = req.startVersion - 1;
logData->persistentDataVersion = logData->unrecoveredBefore - 1;
logData->persistentDataDurableVersion = logData->unrecoveredBefore - 1;
@ -3328,7 +3329,7 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality
TraceEvent("TLogRecover", self->dbgid)
.detail("LogId", logData->logId)
.detail("At", req.recoverAt)
.detail("At", recoverAt)
.detail("Known", req.knownCommittedVersion)
.detail("Unrecovered", logData->unrecoveredBefore)
.detail("Tags", describe(req.recoverTags))
@ -3345,28 +3346,32 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality
self->newLogData.trigger();
if ((req.isPrimary || req.recoverFrom.logRouterTags == 0) && !logData->stopped &&
logData->unrecoveredBefore <= req.recoverAt) {
logData->unrecoveredBefore <= recoverAt) {
if (req.recoverFrom.logRouterTags > 0 && req.locality != tagLocalitySatellite) {
logData->logRouterPopToVersion = req.recoverAt;
logData->logRouterPopToVersion = recoverAt;
std::vector<Tag> tags;
tags.push_back(logData->remoteTag);
wait(pullAsyncData(self, logData, tags, logData->unrecoveredBefore, req.recoverAt, true) ||
wait(pullAsyncData(self, logData, tags, logData->unrecoveredBefore, recoverAt, true) ||
logData->removed || logData->stopCommit.onTrigger());
} else if (!req.recoverTags.empty()) {
ASSERT(logData->unrecoveredBefore > req.knownCommittedVersion);
wait(pullAsyncData(
self, logData, req.recoverTags, req.knownCommittedVersion + 1, req.recoverAt, false) ||
self, logData, req.recoverTags, req.knownCommittedVersion + 1, recoverAt, false) ||
logData->removed || logData->stopCommit.onTrigger());
}
pulledRecoveryVersions = true;
logData->knownCommittedVersion = req.recoverAt;
logData->knownCommittedVersion = recoverAt;
}
if ((req.isPrimary || req.recoverFrom.logRouterTags == 0) && logData->version.get() < req.recoverAt &&
!logData->stopped) {
state Version lastVersionPrevEpoch = req.recoverAt;
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
lastVersionPrevEpoch = req.maxRv;
}
if ((req.isPrimary || req.recoverFrom.logRouterTags == 0) &&
logData->version.get() < lastVersionPrevEpoch && !logData->stopped) {
// Log the changes to the persistent queue, to be committed by commitQueue()
TLogQueueEntryRef qe;
qe.version = req.recoverAt;
qe.version = lastVersionPrevEpoch;
qe.knownCommittedVersion = logData->knownCommittedVersion;
qe.messages = StringRef();
qe.id = logData->logId;
@ -3376,8 +3381,7 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality
if (self->diskQueueCommitBytes > SERVER_KNOBS->MAX_QUEUE_COMMIT_BYTES) {
self->largeDiskQueueCommitBytes.set(true);
}
logData->version.set(req.recoverAt);
logData->version.set(lastVersionPrevEpoch);
}
if (logData->recoveryComplete.isSet()) {

View File

@ -1679,6 +1679,9 @@ Future<Void> TagPartitionedLogSystem::onLogSystemConfigChange() {
Version TagPartitionedLogSystem::getEnd() const {
ASSERT(recoverAt.present());
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
return maxRv + 1;
}
return recoverAt.get() + 1;
}
@ -1829,7 +1832,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::monitorLog(Reference<AsyncVar<Option
}
}
Optional<std::tuple<Version, Version, std::map<UID, Version>>> TagPartitionedLogSystem::getDurableVersion(
Optional<std::tuple<Version, Version, std::vector<TLogLockResult>>> TagPartitionedLogSystem::getDurableVersion(
UID dbgid,
LogLockInfo lockInfo,
std::vector<Reference<AsyncVar<bool>>> failed,
@ -1916,40 +1919,14 @@ Optional<std::tuple<Version, Version, std::map<UID, Version>>> TagPartitionedLog
.detail("KnownCommittedVersion", knownCommittedVersion)
.detail("EpochEnd", lockInfo.epochEnd);
std::map<UID, Version> rvPerTLog;
std::unordered_map<Version, int> versionRepCount;
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
for (auto& result : results) {
rvPerTLog[result.id] = results[new_safe_range_begin].end;
for (int i = 0; i < result.unknownCommittedVersions.size(); i++) {
Version k = std::get<0>(result.unknownCommittedVersions[i]);
if (k <= results[new_safe_range_begin].end) {
continue;
}
versionRepCount[k]++;
}
}
int toleratedFailues = logSet->logServers.size() - requiredCount;
for (auto& result : results) {
for (int i = 0; i < result.unknownCommittedVersions.size(); i++) {
Version k = std::get<0>(result.unknownCommittedVersions[i]);
if (versionRepCount[k] >= std::get<1>(result.unknownCommittedVersions[i]) - toleratedFailues) {
rvPerTLog[result.id] = k;
}
}
}
for (auto& result : results) {
TraceEvent("GetDurableResultLog").detail("LOG", result.id).detail("RV", rvPerTLog[result.id]);
}
}
return std::make_tuple(knownCommittedVersion, results[new_safe_range_begin].end, rvPerTLog);
return std::make_tuple(knownCommittedVersion, results[new_safe_range_begin].end, results);
}
}
TraceEvent("GetDurableResultWaiting", dbgid)
.detail("Required", requiredCount)
.detail("Present", results.size())
.detail("ServerState", sServerState);
return Optional<std::tuple<Version, Version, std::map<UID, Version>>>();
return Optional<std::tuple<Version, Version, std::vector<TLogLockResult>>>();
}
ACTOR Future<Void> TagPartitionedLogSystem::getDurableVersionChanged(LogLockInfo lockInfo,
@ -2230,7 +2207,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::epochEnd(Reference<AsyncVar<Referenc
Version minEnd = std::numeric_limits<Version>::max();
Version maxEnd = 0;
std::vector<Future<Void>> changes;
state std::map<UID, Version> rvLogs;
std::vector<TLogLockResult> results;
for (int log = 0; log < logServers.size(); log++) {
if (!logServers[log]->isLocal) {
continue;
@ -2239,7 +2216,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::epochEnd(Reference<AsyncVar<Referenc
TagPartitionedLogSystem::getDurableVersion(dbgid, lockResults[log], logFailed[log], lastEnd);
if (versions.present()) {
knownCommittedVersion = std::max(knownCommittedVersion, std::get<0>(versions.get()));
rvLogs.merge(std::get<2>(versions.get()));
results.insert(results.end(), std::get<2>(versions.get()).begin(), std::get<2>(versions.get()).end());
maxEnd = std::max(maxEnd, std::get<1>(versions.get()));
minEnd = std::min(minEnd, std::get<1>(versions.get()));
}
@ -2250,18 +2227,47 @@ ACTOR Future<Void> TagPartitionedLogSystem::epochEnd(Reference<AsyncVar<Referenc
auto logSystem = makeReference<TagPartitionedLogSystem>(dbgid, locality, prevState.recoveryCount);
// If UNICAST is set, keep refreshing list TLog's recovery versions as maxEnd will vary
if (!SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
lastEnd = minEnd;
logSystem->recoverAt = minEnd;
} else {
logSystem->recoverAt = invalidVersion;
for (auto const& [key, val] : rvLogs) {
if (val > logSystem->recoverAt.get()) {
logSystem->recoverAt = val;
// If UNICAST is set, keep refreshing list TLog's recovery versions as maxEnd may change as new tLogs are
// locked.
logSystem->recoverAt = minEnd;
logSystem->maxRv = minEnd;
std::map<UID, Version> rvLogs;
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
std::unordered_map<Version, int> versionRepCount;
for (auto& result : results) {
for (int i = 0; i < result.unknownCommittedVersions.size(); i++) {
Version k = std::get<0>(result.unknownCommittedVersions[i]);
versionRepCount[k]++;
}
}
for (auto& result : results) {
for (int i = 0; i < result.unknownCommittedVersions.size(); i++) {
Version k = std::get<0>(result.unknownCommittedVersions[i]);
Reference<LogSet> logSet = lockResults[0].logSet;
if (versionRepCount[k] >=
std::get<1>(result.unknownCommittedVersions[i]) - logSet->tLogReplicationFactor + 1) {
if (k > rvLogs[result.id]) {
rvLogs[result.id] = k;
}
}
}
}
for (auto const& [key, val] : rvLogs) {
if (val > logSystem->maxRv) {
logSystem->maxRv = val;
// TraceEvent("RecoveryVersionInfo").detail("MaxRv", logSystem->maxRv);
}
}
for (auto& result : results) {
rvLogs[result.id] = logSystem->maxRv;
}
} else {
lastEnd = minEnd;
}
logSystem->rvLogs = rvLogs;
logSystem->tLogs = logServers;
logSystem->logRouterTags = prevState.logRouterTags;
@ -2610,7 +2616,6 @@ ACTOR Future<Void> TagPartitionedLogSystem::newRemoteEpoch(TagPartitionedLogSyst
for (int i = 0; i < remoteWorkers.remoteTLogs.size(); i++) {
InitializeTLogRequest& req = remoteTLogReqs[i];
// req.rvLogs = logSystem->rvLogs;
req.recruitmentID = self->recruitmentID;
req.logVersion = configuration.tLogVersion;
req.storeType = configuration.tLogDataStoreType;
@ -2886,6 +2891,7 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
req.recoverFrom = oldLogSystemConfig;
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
req.rvLogs = oldLogSystem->rvLogs;
req.maxRv = oldLogSystem->maxRv;
}
req.recoverAt = oldLogSystem->recoverAt.get();
req.knownCommittedVersion = oldLogSystem->knownCommittedVersion;
@ -2948,7 +2954,6 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
for (int i = 0; i < recr.satelliteTLogs.size(); i++) {
InitializeTLogRequest& req = sreqs[i];
// req.rvLogs = logSystem->rvLogs;
req.recruitmentID = logSystem->recruitmentID;
req.logVersion = configuration.tLogVersion;
req.storeType = configuration.tLogDataStoreType;
@ -2956,6 +2961,7 @@ ACTOR Future<Reference<ILogSystem>> TagPartitionedLogSystem::newEpoch(
req.recoverFrom = oldLogSystemConfig;
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
req.rvLogs = oldLogSystem->rvLogs;
req.maxRv = oldLogSystem->maxRv;
}
req.recoverAt = oldLogSystem->recoverAt.get();
req.knownCommittedVersion = oldLogSystem->knownCommittedVersion;

View File

@ -105,7 +105,8 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
Optional<Version> recoveredAt;
Version knownCommittedVersion;
Version backupStartVersion = invalidVersion; // max(tLogs[0].startVersion, previous epochEnd).
std::map<UID, Version> rvLogs; // log system to list of recovery versions per tlog
Version maxRv;
std::map<UID, Version> rvLogs; // recovery versions per tlog
LocalityData locality;
// For each currently running popFromLog actor, outstandingPops is
// (logID, tag)->(max popped version, durableKnownCommittedVersion).
@ -125,7 +126,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
LogEpoch e,
Optional<PromiseStream<Future<Void>>> addActor = Optional<PromiseStream<Future<Void>>>())
: dbgid(dbgid), logSystemType(LogSystemType::empty), expectedLogSets(0), logRouterTags(0), txsTags(0),
repopulateRegionAntiQuorum(0), stopped(false), epoch(e), oldestBackupEpoch(0),
repopulateRegionAntiQuorum(0), stopped(false), epoch(e), oldestBackupEpoch(0), maxRv(0),
recoveryCompleteWrittenToCoreState(false), remoteLogsWrittenToCoreState(false), hasRemoteServers(false),
locality(locality), addActor(addActor), popActors(false) {}
@ -310,7 +311,7 @@ struct TagPartitionedLogSystem final : ILogSystem, ReferenceCounted<TagPartition
ACTOR static Future<Void> monitorLog(Reference<AsyncVar<OptionalInterface<TLogInterface>>> logServer,
Reference<AsyncVar<bool>> failed);
Optional<std::tuple<Version, Version, std::map<UID, Version>>> static getDurableVersion(
Optional<std::tuple<Version, Version, std::vector<TLogLockResult>>> static getDurableVersion(
UID dbgid,
LogLockInfo lockInfo,
std::vector<Reference<AsyncVar<bool>>> failed = std::vector<Reference<AsyncVar<bool>>>(),
@ -381,4 +382,4 @@ std::vector<T> TagPartitionedLogSystem::getReadyNonError(std::vector<Future<T>>
}
#include "flow/unactorcompiler.h"
#endif // FDBSERVER_TAGPARTITIONEDLOGSYSTEM_ACTOR_H
#endif // FDBSERVER_TAGPARTITIONEDLOGSYSTEM_ACTOR_H

View File

@ -486,6 +486,7 @@ struct InitializeTLogRequest {
UID recruitmentID;
LogSystemConfig recoverFrom;
Version recoverAt;
Version maxRv;
Version knownCommittedVersion;
LogEpoch epoch;
std::vector<Tag> recoverTags;
@ -511,6 +512,7 @@ struct InitializeTLogRequest {
recruitmentID,
recoverFrom,
recoverAt,
maxRv,
knownCommittedVersion,
epoch,
recoverTags,

View File

@ -824,6 +824,11 @@ ACTOR Future<Void> updateLocalityForDcId(Optional<Key> dcId,
if (ver == invalidVersion) {
ver = oldLogSystem->getKnownCommittedVersion();
}
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
// Do not try to split peeks between data centers in peekTxns() to recover mem kvstore.
// This recovery optimization won't work in UNICAST mode.
loc.first = -1;
}
locality->set(PeekTxsInfo(loc.first, loc.second, ver));
TraceEvent("UpdatedLocalityForDcId")
.detail("DcId", dcId)