From 237f0c35cd1c83de7c0c896a7e8b4cd0b00611e9 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Wed, 12 Feb 2020 10:02:27 -0800 Subject: [PATCH] Add mutations state variable to hold on to memory --- fdbserver/BackupWorker.actor.cpp | 121 ++++++++++++------------------- 1 file changed, 47 insertions(+), 74 deletions(-) diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index 8105e01e5f..bfd27ff041 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -44,6 +44,25 @@ struct VersionedMessage { : version(v), message(m), tags(t), arena(a) {} const Version getVersion() const { return version.version; } const uint32_t getSubVersion() const { return version.sub; } + + // Returns true if the message is a mutation that should be backuped, i.e., + // either key is not in system key space or is not a metadataVersionKey. + bool isBackupMessage(MutationRef* m) const { + for (Tag tag : tags) { + if (tag.locality == tagLocalitySpecial || tag.locality == tagLocalityTxs) { + return false; // skip Txs mutations + } + } + + BinaryReader reader(message.begin(), message.size(), AssumeVersion(currentProtocolVersion)); + + // Return false for LogProtocolMessage. + if (LogProtocolMessage::isNextIn(reader)) return false; + + reader >> *m; +TraceEvent("KeyDebug").detail("M", m->toString()); + return normalKeys.contains(m->param1) || m->param1 == metadataVersionKey; + } }; struct BackupData { @@ -317,30 +336,6 @@ ACTOR Future saveProgress(BackupData* self, Version backupVersion) { } } -// Returns true if the message is a mutation that should be backuped, i.e., -// either key is not in system key space or is not a metadataVersionKey. -bool isBackupMessage(const VersionedMessage& msg, MutationRef* m) { - for (Tag tag : msg.tags) { - if (tag.locality == tagLocalitySpecial || tag.locality == tagLocalityTxs) { - return false; // skip Txs mutations - } - } - - BinaryReader reader(msg.message.begin(), msg.message.size(), AssumeVersion(currentProtocolVersion)); - - // Return false for LogProtocolMessage. - if (LogProtocolMessage::isNextIn(reader)) return false; - - reader >> *m; - - // check for metadataVersionKey and special metadata mutations - if (!normalKeys.contains(m->param1) && m->param1 != metadataVersionKey) { - return false; - } - - return true; -} - // Return a block of contiguous padding bytes, growing if needed. static Value makePadding(int size) { static Value pad; @@ -352,28 +347,19 @@ static Value makePadding(int size) { return pad.substr(0, size); } -ACTOR Future addMutation(BackupData* self, Reference logFile, int idx, MutationRef m, - int64_t* blockEnd, int blockSize, bool useM) { - state Standalone keep; - state StringRef message; - - if (useM) { - BinaryWriter wr(AssumeVersion(currentProtocolVersion)); - wr << m; - keep = wr.toValue(); - message = keep; - } else { - message = self->messages[idx].message; - } - state int bytes = sizeof(Version) + sizeof(uint32_t) + sizeof(int) + message.size(); +// Write a mutation to a log file. Note the mutation can be different from +// message.message for clear mutations. +ACTOR Future addMutation(Reference logFile, VersionedMessage message, StringRef mutation, + int64_t* blockEnd, int blockSize) { + state int bytes = sizeof(Version) + sizeof(uint32_t) + sizeof(int) + mutation.size(); // Convert to big Endianness for version.version, version.sub, and msgSize // The decoder assumes 0xFF is the end, so little endian can easily be // mistaken as the end. In contrast, big endian for version almost guarantee // the first byte is not 0xFF (should always be 0x00). BinaryWriter wr(Unversioned()); - wr << bigEndian64(self->messages[idx].version.version) << bigEndian32(self->messages[idx].version.sub) - << bigEndian32(message.size()); + wr << bigEndian64(message.version.version) << bigEndian32(message.version.sub) + << bigEndian32(mutation.size()); state Standalone header = wr.toValue(); // Start a new block if needed @@ -390,7 +376,7 @@ ACTOR Future addMutation(BackupData* self, Reference logFile, } wait(logFile->append((void*)header.begin(), header.size())); - wait(logFile->append(message.begin(), message.size())); + wait(logFile->append(mutation.begin(), mutation.size())); return Void(); } @@ -404,7 +390,8 @@ ACTOR Future saveMutationsToFile(BackupData* self, Version popVersion, int state std::vector blockEnds; state std::set activeUids; // active Backups' UIDs state KeyRangeMap> keyRangeMap; // range to index in logFileFutures, logFiles, & blockEnds - state MutationRef m; + state std::vector> mutations; + state int idx; for (auto it = self->backups.begin(); it != self->backups.end();) { if (!it->second.isRunning()) { @@ -425,6 +412,12 @@ ACTOR Future saveMutationsToFile(BackupData* self, Version popVersion, int it->second.lastSavedVersion, popVersion + 1, blockSize, self->tag.id)); it++; } + if (activeUids.empty()) { + // stop early if there is no active backups + self->messages.erase(self->messages.begin(), self->messages.begin() + numMsg); + TraceEvent("BackupWorkerSkip", self->myId).detail("Count", numMsg); + return Void(); + } keyRangeMap.coalesce(allKeys); wait(waitForAll(logFileFutures)); @@ -437,31 +430,37 @@ ACTOR Future saveMutationsToFile(BackupData* self, Version popVersion, int .detail("File", file->getFileName()); } - state int idx = 0; blockEnds = std::vector(logFiles.size(), 0); - for (; idx < numMsg; idx++) { - if (!isBackupMessage(self->messages[idx], &m)) continue; + for (idx = 0; idx < numMsg; idx++) { + auto& message = self->messages[idx]; + MutationRef m; + if (!message.isBackupMessage(&m)) continue; std::vector> adds; if (m.type != MutationRef::Type::ClearRange) { for (int index : keyRangeMap[m.param1]) { - adds.push_back(addMutation(self, logFiles[index], idx, m, &blockEnds[index], blockSize, false)); + adds.push_back(addMutation(logFiles[index], message, message.message, &blockEnds[index], blockSize)); } } else { KeyRangeRef mutationRange(m.param1, m.param2); - KeyRangeRef intersectionRange; + KeyRange intersectionRange; // Find intersection ranges and create mutations for sub-ranges for (auto range : keyRangeMap.intersectingRanges(mutationRange)) { const auto& subrange = range.range(); intersectionRange = mutationRange & subrange; MutationRef subm(MutationRef::Type::ClearRange, intersectionRange.begin, intersectionRange.end); + BinaryWriter wr(AssumeVersion(currentProtocolVersion)); + wr << subm; + mutations.push_back(wr.toValue()); for (int index : range.value()) { - adds.push_back(addMutation(self, logFiles[index], idx, subm, &blockEnds[index], blockSize, true)); + adds.push_back( + addMutation(logFiles[index], message, mutations.back(), &blockEnds[index], blockSize)); } } } wait(waitForAll(adds)); + mutations.clear(); } self->messages.erase(self->messages.begin(), self->messages.begin() + numMsg); @@ -692,29 +691,3 @@ ACTOR Future backupWorker(BackupInterface interf, InitializeBackupRequest return Void(); } -#include "flow/UnitTest.h" - -TEST_CASE("/BackupWorker/Range") { - KeyRangeMap> rangeMap; - - for (int index : rangeMap[LiteralStringRef("1")]) { - ASSERT(false); - printf("index %d\n", index); - } - - for (auto& logRange : rangeMap.modify(normalKeys)) { - logRange->value().insert(1); - } - for (auto& logRange : rangeMap.modify(singleKeyRange(metadataVersionKey))) { - logRange->value().insert(1); - } - for (auto& logRange : rangeMap.modify(KeyRange(KeyRangeRef(LiteralStringRef("0"), LiteralStringRef("5"))))) { - logRange->value().insert(2); - } - // rangeMap.coalesce(allKeys); - - for (int index : rangeMap[LiteralStringRef("1")]) { - printf("index %d\n", index); - } - return Void(); -} \ No newline at end of file