|
|
|
@ -2081,6 +2081,14 @@ void getTLogLocIds(const std::vector<Reference<LogSet>>& tLogs,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Version findMaxKCV(const std::tuple<int, std::vector<TLogLockResult>>& logGroupResults) {
|
|
|
|
|
Version maxKCV = 0;
|
|
|
|
|
for (auto& tLogResult : std::get<1>(logGroupResults)) {
|
|
|
|
|
maxKCV = std::max(maxKCV, tLogResult.knownCommittedVersion);
|
|
|
|
|
}
|
|
|
|
|
return maxKCV;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void populateBitset(boost::dynamic_bitset<>& bs, const std::vector<uint16_t>& ids) {
|
|
|
|
|
for (auto& id : ids) {
|
|
|
|
|
ASSERT(id < bs.size());
|
|
|
|
@ -2092,15 +2100,17 @@ void populateBitset(boost::dynamic_bitset<>& bs, const std::vector<uint16_t>& id
|
|
|
|
|
// This function finds the highest recoverable version for each tLog group over all log groups.
|
|
|
|
|
// All prior versions to the chosen RV must also be recoverable.
|
|
|
|
|
// TODO: unit tests to stress UNICAST
|
|
|
|
|
Version getRecoverVersionUnicast(const std::vector<Reference<LogSet>>& logServers,
|
|
|
|
|
const std::tuple<int, std::vector<TLogLockResult>>& logGroupResults,
|
|
|
|
|
Version minDV,
|
|
|
|
|
Version minKCV) {
|
|
|
|
|
Optional<std::tuple<Version, Version>> getRecoverVersionUnicast(
|
|
|
|
|
const std::vector<Reference<LogSet>>& logServers,
|
|
|
|
|
const std::tuple<int, std::vector<TLogLockResult>>& logGroupResults,
|
|
|
|
|
Version minDV) {
|
|
|
|
|
std::vector<uint16_t> tLogLocIds;
|
|
|
|
|
uint16_t maxTLogLocId; // maximum possible id, not maximum of id's of available log servers
|
|
|
|
|
getTLogLocIds(logServers, logGroupResults, tLogLocIds, maxTLogLocId);
|
|
|
|
|
uint16_t bsSize = maxTLogLocId + 1; // bitset size, used below
|
|
|
|
|
|
|
|
|
|
Version maxKCV = findMaxKCV(logGroupResults);
|
|
|
|
|
|
|
|
|
|
// Summarize the information sent by various tLogs.
|
|
|
|
|
// A bitset of available tLogs
|
|
|
|
|
boost::dynamic_bitset<> availableTLogs(bsSize);
|
|
|
|
@ -2113,14 +2123,14 @@ Version getRecoverVersionUnicast(const std::vector<Reference<LogSet>>& logServer
|
|
|
|
|
uint16_t tLogIdx = 0;
|
|
|
|
|
int replicationFactor = std::get<0>(logGroupResults);
|
|
|
|
|
for (auto& tLogResult : std::get<1>(logGroupResults)) {
|
|
|
|
|
uint16_t tLogLocId = tLogLocIds[tLogIdx++];
|
|
|
|
|
if (tLogResult.unknownCommittedVersions.empty()) {
|
|
|
|
|
return minKCV;
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
uint16_t tLogLocId = tLogLocIds[tLogIdx];
|
|
|
|
|
availableTLogs.set(tLogLocId);
|
|
|
|
|
for (auto& unknownCommittedVersion : tLogResult.unknownCommittedVersions) {
|
|
|
|
|
Version k = unknownCommittedVersion.version;
|
|
|
|
|
if (k > minKCV) {
|
|
|
|
|
if (k > maxKCV) {
|
|
|
|
|
if (versionAvailableTLogs[k].empty()) {
|
|
|
|
|
versionAvailableTLogs[k].resize(bsSize);
|
|
|
|
|
}
|
|
|
|
@ -2132,26 +2142,31 @@ Version getRecoverVersionUnicast(const std::vector<Reference<LogSet>>& logServer
|
|
|
|
|
populateBitset(versionAllTLogs[k], unknownCommittedVersion.tLogLocIds);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
tLogIdx++;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (versionAllTLogs.empty()) {
|
|
|
|
|
return std::make_tuple(maxKCV, maxKCV);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Compute recovery version.
|
|
|
|
|
//
|
|
|
|
|
// @note we think that the unicast recovery version should always greater than or
|
|
|
|
|
// equal to "min(DV)" (= "minDV"). To be conservative we use "min(KCV)" (= "minKCV")
|
|
|
|
|
// equal to "min(DV)" (= "minDV"). To be conservative we use "max(KCV)" (= "maxKCV")
|
|
|
|
|
// as the default (starting) recovery version and later verify that the computed
|
|
|
|
|
// recovery version is greater than or equal to "minDV".
|
|
|
|
|
// @todo modify code to use "minDV" as the default (starting) recovery version
|
|
|
|
|
// @todo to be investigated: maybe we should use max(KCV) as the starting recovery
|
|
|
|
|
// version (this is because "known committed version" can advance even after a log
|
|
|
|
|
// server is locked when unicast is enabled and so all log servers may not preserve
|
|
|
|
|
// all committed versions, from min(KCV) till the correct recovery version, in
|
|
|
|
|
// "unknownCommittedVersions" causing the recovery algorithm to not find the correct
|
|
|
|
|
// recovery version)?
|
|
|
|
|
Version RV = minKCV; // recovery version
|
|
|
|
|
std::vector<Version> RVs(maxTLogLocId + 1, minKCV); // recovery versions of various tLogs
|
|
|
|
|
Version prevVersion = minKCV;
|
|
|
|
|
//
|
|
|
|
|
// @note we are not using "min(KCV)" as the default recovery version because "known
|
|
|
|
|
// committed version" can advance even after a log server is locked when unicast is
|
|
|
|
|
// enabled and so all log servers may not preserve all committed versions, from
|
|
|
|
|
// "min(KCV)" till the correct recovery version, in "unknownCommittedVersions" and
|
|
|
|
|
// this may cause the recovery algorithm to not find the correct recovery version.
|
|
|
|
|
//
|
|
|
|
|
// @todo modify code to use "minDV" as the default (starting) recovery version.
|
|
|
|
|
Version RV = maxKCV; // recovery version
|
|
|
|
|
std::vector<Version> RVs(maxTLogLocId + 1, maxKCV); // recovery versions of various tLogs
|
|
|
|
|
Version prevVersion = maxKCV;
|
|
|
|
|
for (auto const& [version, tLogs] : versionAllTLogs) {
|
|
|
|
|
if (!(prevVersion == minKCV || prevVersion == prevVersionMap[version])) {
|
|
|
|
|
if (!(prevVersion == maxKCV || prevVersion == prevVersionMap[version])) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
// This version is not recoverable if there is a log server (LS) such that:
|
|
|
|
@ -2178,7 +2193,8 @@ Version getRecoverVersionUnicast(const std::vector<Reference<LogSet>>& logServer
|
|
|
|
|
prevVersion = version;
|
|
|
|
|
}
|
|
|
|
|
ASSERT_WE_THINK(RV >= minDV && RV != std::numeric_limits<Version>::max());
|
|
|
|
|
return RV;
|
|
|
|
|
ASSERT_WE_THINK(RV >= maxKCV);
|
|
|
|
|
return std::make_tuple(maxKCV, RV);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ACTOR Future<Void> TagPartitionedLogSystem::epochEnd(Reference<AsyncVar<Reference<ILogSystem>>> outLogSystem,
|
|
|
|
@ -2442,7 +2458,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::epochEnd(Reference<AsyncVar<Referenc
|
|
|
|
|
state Version knownCommittedVersion = 0;
|
|
|
|
|
loop {
|
|
|
|
|
Version minEnd = std::numeric_limits<Version>::max();
|
|
|
|
|
Version minKCV = std::numeric_limits<Version>::max();
|
|
|
|
|
Version minDV = std::numeric_limits<Version>::max();
|
|
|
|
|
Version maxEnd = 0;
|
|
|
|
|
state std::vector<Future<Void>> changes;
|
|
|
|
|
state std::vector<std::tuple<int, std::vector<TLogLockResult>>> logGroupResults;
|
|
|
|
@ -2453,15 +2469,17 @@ ACTOR Future<Void> TagPartitionedLogSystem::epochEnd(Reference<AsyncVar<Referenc
|
|
|
|
|
auto versions =
|
|
|
|
|
TagPartitionedLogSystem::getDurableVersion(dbgid, lockResults[log], logFailed[log], lastEnd);
|
|
|
|
|
if (versions.present()) {
|
|
|
|
|
knownCommittedVersion = std::max(knownCommittedVersion, std::get<0>(versions.get()));
|
|
|
|
|
logGroupResults.emplace_back(logServers[log]->tLogReplicationFactor, std::get<2>(versions.get()));
|
|
|
|
|
maxEnd = std::max(maxEnd, std::get<1>(versions.get()));
|
|
|
|
|
minEnd = std::min(minEnd, std::get<1>(versions.get()));
|
|
|
|
|
minKCV = std::min(minKCV, std::get<0>(versions.get()));
|
|
|
|
|
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
|
|
|
|
|
Version version = getRecoverVersionUnicast(logServers, logGroupResults.back(), minEnd, minKCV);
|
|
|
|
|
maxEnd = std::max(maxEnd, version);
|
|
|
|
|
minEnd = std::min(minEnd, version);
|
|
|
|
|
minDV = std::min(minDV, std::get<1>(versions.get()));
|
|
|
|
|
if (!SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
|
|
|
|
|
knownCommittedVersion = std::max(knownCommittedVersion, std::get<0>(versions.get()));
|
|
|
|
|
maxEnd = std::max(maxEnd, std::get<1>(versions.get()));
|
|
|
|
|
minEnd = std::min(minEnd, std::get<1>(versions.get()));
|
|
|
|
|
} else {
|
|
|
|
|
auto unicastVersions = getRecoverVersionUnicast(logServers, logGroupResults.back(), minDV);
|
|
|
|
|
knownCommittedVersion = std::max(knownCommittedVersion, std::get<0>(unicastVersions.get()));
|
|
|
|
|
maxEnd = std::max(maxEnd, std::get<1>(unicastVersions.get()));
|
|
|
|
|
minEnd = std::min(minEnd, std::get<1>(unicastVersions.get()));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
changes.push_back(TagPartitionedLogSystem::getDurableVersionChanged(lockResults[log], logFailed[log]));
|
|
|
|
|