Compare commits

...

2 Commits

Author SHA1 Message Date
Sreenath Bodagala ac9a56e552 - Extend the disk queue entry data structure to hold the following
information, needed by the recovery algorithm when unicast is enabled:
- PrevVersion of a version
- List of log servers that a commit proxy sends a commit version to

And, extend the code to populate "unknownCommittedVersions" list (the
list of commit versions whose commit status is not known, and to be
determined by the recovery version computation algorithm) on a log
server restart.

NOTE: Please note that these changes will cause version incompatibility
and so additional code/logic will need to be added to make sure that
upgrades/restart related simulation tests work properly.
2024-10-23 19:36:21 +00:00
Sreenath Bodagala b160bcaad9 - Modify the unicast-based recovery version computation algorithm
the following ways:

- Make it use "max(KCV)", instead of "min(KCV)", as the default
recovery version
- Make it return the <max(KCV), recovery version> pair as output
- Make the invoker use the "max(KCV)" version returned by the
algorithm
- Address a bug related to the computation of recovery version (in
the case where a subset of log servers have empty
"unknownCommittedVersions" list)

And, make the invoker compute the max() and min() of recovery versions
of primary and satellite DCs based on the recovery versions computed
by the appropriate (with/without unicast) recovery version computation
algorithms.
2024-10-23 19:17:22 +00:00
2 changed files with 70 additions and 35 deletions

View File

@ -53,17 +53,19 @@ struct TLogQueueEntryRef {
UID id;
Version version;
Version knownCommittedVersion;
Version prevVersion;
std::vector<uint16_t> tLogLocIds;
StringRef messages;
TLogQueueEntryRef() : version(0), knownCommittedVersion(0) {}
TLogQueueEntryRef() : version(0), knownCommittedVersion(0), prevVersion(0) {}
TLogQueueEntryRef(Arena& a, TLogQueueEntryRef const& from)
: id(from.id), version(from.version), knownCommittedVersion(from.knownCommittedVersion),
messages(a, from.messages) {}
messages(a, from.messages), prevVersion(from.prevVersion), tLogLocIds(from.tLogLocIds) {}
// To change this serialization, ProtocolVersion::TLogQueueEntryRef must be updated, and downgrades need to be
// considered
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, messages, knownCommittedVersion, id);
serializer(ar, version, messages, knownCommittedVersion, prevVersion, tLogLocIds, id);
}
size_t expectedSize() const { return messages.expectedSize(); }
};
@ -73,8 +75,11 @@ struct AlternativeTLogQueueEntryRef {
Version version;
Version knownCommittedVersion;
std::vector<TagsAndMessage>* alternativeMessages;
Version prevVersion;
std::vector<uint16_t> tLogLocIds;
AlternativeTLogQueueEntryRef() : version(0), knownCommittedVersion(0), alternativeMessages(nullptr) {}
AlternativeTLogQueueEntryRef()
: version(0), knownCommittedVersion(0), alternativeMessages(nullptr), prevVersion(0) {}
template <class Ar>
void serialize(Ar& ar) {
@ -84,7 +89,7 @@ struct AlternativeTLogQueueEntryRef {
for (auto& msg : *alternativeMessages) {
ar.serializeBytes(msg.message);
}
serializer(ar, knownCommittedVersion, id);
serializer(ar, knownCommittedVersion, prevVersion, tLogLocIds, id);
}
uint32_t expectedSize() const {
@ -2411,6 +2416,8 @@ ACTOR Future<Void> tLogCommit(TLogData* self,
qe.knownCommittedVersion = logData->knownCommittedVersion;
qe.messages = req.messages;
qe.id = logData->logId;
qe.prevVersion = req.seqPrevVersion;
qe.tLogLocIds = req.tLogLocIds;
self->persistentQueue->push(qe, logData);
self->diskQueueCommitBytes += qe.expectedSize();
@ -3408,6 +3415,16 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
}
logData->knownCommittedVersion =
std::max(logData->knownCommittedVersion, qe.knownCommittedVersion);
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
logData->unknownCommittedVersions.emplace_front(qe.version, qe.prevVersion, qe.tLogLocIds);
while (!logData->unknownCommittedVersions.empty() &&
logData->unknownCommittedVersions.back().version <= logData->knownCommittedVersion) {
logData->unknownCommittedVersions.pop_back();
}
}
if (qe.version > logData->version.get()) {
commitMessages(self, logData, qe.version, qe.arena(), qe.messages);
logData->version.set(qe.version);

View File

@ -2081,6 +2081,14 @@ void getTLogLocIds(const std::vector<Reference<LogSet>>& tLogs,
}
}
Version findMaxKCV(const std::tuple<int, std::vector<TLogLockResult>>& logGroupResults) {
Version maxKCV = 0;
for (auto& tLogResult : std::get<1>(logGroupResults)) {
maxKCV = std::max(maxKCV, tLogResult.knownCommittedVersion);
}
return maxKCV;
}
void populateBitset(boost::dynamic_bitset<>& bs, const std::vector<uint16_t>& ids) {
for (auto& id : ids) {
ASSERT(id < bs.size());
@ -2092,15 +2100,17 @@ void populateBitset(boost::dynamic_bitset<>& bs, const std::vector<uint16_t>& id
// This function finds the highest recoverable version for each tLog group over all log groups.
// All prior versions to the chosen RV must also be recoverable.
// TODO: unit tests to stress UNICAST
Version getRecoverVersionUnicast(const std::vector<Reference<LogSet>>& logServers,
const std::tuple<int, std::vector<TLogLockResult>>& logGroupResults,
Version minDV,
Version minKCV) {
Optional<std::tuple<Version, Version>> getRecoverVersionUnicast(
const std::vector<Reference<LogSet>>& logServers,
const std::tuple<int, std::vector<TLogLockResult>>& logGroupResults,
Version minDV) {
std::vector<uint16_t> tLogLocIds;
uint16_t maxTLogLocId; // maximum possible id, not maximum of id's of available log servers
getTLogLocIds(logServers, logGroupResults, tLogLocIds, maxTLogLocId);
uint16_t bsSize = maxTLogLocId + 1; // bitset size, used below
Version maxKCV = findMaxKCV(logGroupResults);
// Summarize the information sent by various tLogs.
// A bitset of available tLogs
boost::dynamic_bitset<> availableTLogs(bsSize);
@ -2113,14 +2123,14 @@ Version getRecoverVersionUnicast(const std::vector<Reference<LogSet>>& logServer
uint16_t tLogIdx = 0;
int replicationFactor = std::get<0>(logGroupResults);
for (auto& tLogResult : std::get<1>(logGroupResults)) {
uint16_t tLogLocId = tLogLocIds[tLogIdx++];
if (tLogResult.unknownCommittedVersions.empty()) {
return minKCV;
continue;
}
uint16_t tLogLocId = tLogLocIds[tLogIdx];
availableTLogs.set(tLogLocId);
for (auto& unknownCommittedVersion : tLogResult.unknownCommittedVersions) {
Version k = unknownCommittedVersion.version;
if (k > minKCV) {
if (k > maxKCV) {
if (versionAvailableTLogs[k].empty()) {
versionAvailableTLogs[k].resize(bsSize);
}
@ -2132,26 +2142,31 @@ Version getRecoverVersionUnicast(const std::vector<Reference<LogSet>>& logServer
populateBitset(versionAllTLogs[k], unknownCommittedVersion.tLogLocIds);
}
}
tLogIdx++;
}
if (versionAllTLogs.empty()) {
return std::make_tuple(maxKCV, maxKCV);
}
// Compute recovery version.
//
// @note we think that the unicast recovery version should always greater than or
// equal to "min(DV)" (= "minDV"). To be conservative we use "min(KCV)" (= "minKCV")
// equal to "min(DV)" (= "minDV"). To be conservative we use "max(KCV)" (= "maxKCV")
// as the default (starting) recovery version and later verify that the computed
// recovery version is greater than or equal to "minDV".
// @todo modify code to use "minDV" as the default (starting) recovery version
// @todo to be investigated: maybe we should use max(KCV) as the starting recovery
// version (this is because "known committed version" can advance even after a log
// server is locked when unicast is enabled and so all log servers may not preserve
// all committed versions, from min(KCV) till the correct recovery version, in
// "unknownCommittedVersions" causing the recovery algorithm to not find the correct
// recovery version)?
Version RV = minKCV; // recovery version
std::vector<Version> RVs(maxTLogLocId + 1, minKCV); // recovery versions of various tLogs
Version prevVersion = minKCV;
//
// @note we are not using "min(KCV)" as the default recovery version because "known
// committed version" can advance even after a log server is locked when unicast is
// enabled and so all log servers may not preserve all committed versions, from
// "min(KCV)" till the correct recovery version, in "unknownCommittedVersions" and
// this may cause the recovery algorithm to not find the correct recovery version.
//
// @todo modify code to use "minDV" as the default (starting) recovery version.
Version RV = maxKCV; // recovery version
std::vector<Version> RVs(maxTLogLocId + 1, maxKCV); // recovery versions of various tLogs
Version prevVersion = maxKCV;
for (auto const& [version, tLogs] : versionAllTLogs) {
if (!(prevVersion == minKCV || prevVersion == prevVersionMap[version])) {
if (!(prevVersion == maxKCV || prevVersion == prevVersionMap[version])) {
break;
}
// This version is not recoverable if there is a log server (LS) such that:
@ -2178,7 +2193,8 @@ Version getRecoverVersionUnicast(const std::vector<Reference<LogSet>>& logServer
prevVersion = version;
}
ASSERT_WE_THINK(RV >= minDV && RV != std::numeric_limits<Version>::max());
return RV;
ASSERT_WE_THINK(RV >= maxKCV);
return std::make_tuple(maxKCV, RV);
}
ACTOR Future<Void> TagPartitionedLogSystem::epochEnd(Reference<AsyncVar<Reference<ILogSystem>>> outLogSystem,
@ -2442,7 +2458,7 @@ ACTOR Future<Void> TagPartitionedLogSystem::epochEnd(Reference<AsyncVar<Referenc
state Version knownCommittedVersion = 0;
loop {
Version minEnd = std::numeric_limits<Version>::max();
Version minKCV = std::numeric_limits<Version>::max();
Version minDV = std::numeric_limits<Version>::max();
Version maxEnd = 0;
state std::vector<Future<Void>> changes;
state std::vector<std::tuple<int, std::vector<TLogLockResult>>> logGroupResults;
@ -2453,15 +2469,17 @@ ACTOR Future<Void> TagPartitionedLogSystem::epochEnd(Reference<AsyncVar<Referenc
auto versions =
TagPartitionedLogSystem::getDurableVersion(dbgid, lockResults[log], logFailed[log], lastEnd);
if (versions.present()) {
knownCommittedVersion = std::max(knownCommittedVersion, std::get<0>(versions.get()));
logGroupResults.emplace_back(logServers[log]->tLogReplicationFactor, std::get<2>(versions.get()));
maxEnd = std::max(maxEnd, std::get<1>(versions.get()));
minEnd = std::min(minEnd, std::get<1>(versions.get()));
minKCV = std::min(minKCV, std::get<0>(versions.get()));
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
Version version = getRecoverVersionUnicast(logServers, logGroupResults.back(), minEnd, minKCV);
maxEnd = std::max(maxEnd, version);
minEnd = std::min(minEnd, version);
minDV = std::min(minDV, std::get<1>(versions.get()));
if (!SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
knownCommittedVersion = std::max(knownCommittedVersion, std::get<0>(versions.get()));
maxEnd = std::max(maxEnd, std::get<1>(versions.get()));
minEnd = std::min(minEnd, std::get<1>(versions.get()));
} else {
auto unicastVersions = getRecoverVersionUnicast(logServers, logGroupResults.back(), minDV);
knownCommittedVersion = std::max(knownCommittedVersion, std::get<0>(unicastVersions.get()));
maxEnd = std::max(maxEnd, std::get<1>(unicastVersions.get()));
minEnd = std::min(minEnd, std::get<1>(unicastVersions.get()));
}
}
changes.push_back(TagPartitionedLogSystem::getDurableVersionChanged(lockResults[log], logFailed[log]));