diff --git a/fdbserver/BackupProgress.actor.cpp b/fdbserver/BackupProgress.actor.cpp index 985fcb7f93..898ce31b70 100644 --- a/fdbserver/BackupProgress.actor.cpp +++ b/fdbserver/BackupProgress.actor.cpp @@ -83,21 +83,24 @@ std::map, std::map> BackupProgr auto progressIt = progress.lower_bound(epoch); if (progressIt != progress.end() && progressIt->first == epoch) { - if (progressIt != progress.begin()) { + std::set toCheck = tags; + for (auto current = progressIt; current != progress.begin() && !toCheck.empty();) { + auto prev = std::prev(current); // Previous epoch is gone, consolidate the progress. - auto prev = std::prev(progressIt); for (auto [tag, version] : prev->second) { - if (tags.count(tag) > 0) { + if (toCheck.count(tag) > 0) { progressIt->second[tag] = std::max(version, progressIt->second[tag]); + toCheck.erase(tag); } } + current = prev; } updateTagVersions(&tagVersions, &tags, progressIt->second, info.epochEnd, adjustedBeginVersion, epoch); } else { auto rit = std::find_if( progress.rbegin(), progress.rend(), [epoch = epoch](const std::pair>& p) { return p.first < epoch; }); - if (!(rit == progress.rend())) { + while (!(rit == progress.rend())) { // A partial recovery can result in empty epoch that copies previous // epoch's version range. In this case, we should check previous // epoch's savedVersion. @@ -112,7 +115,9 @@ std::map, std::map> BackupProgr // ASSERT(info.logRouterTags == epochTags[rit->first]); updateTagVersions(&tagVersions, &tags, rit->second, info.epochEnd, adjustedBeginVersion, epoch); + break; } + rit++; } } diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index 58c4f4b682..430bcc82ca 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -34,14 +34,17 @@ #include "flow/actorcompiler.h" // This must be the last #include. +#define SevDebugMemory SevVerbose + struct VersionedMessage { LogMessageVersion version; StringRef message; VectorRef tags; Arena arena; // Keep a reference to the memory containing the message + size_t bytes; // arena's size when inserted, which can grow afterwards VersionedMessage(LogMessageVersion v, StringRef m, const VectorRef& t, const Arena& a) - : version(v), message(m), tags(t), arena(a) {} + : version(v), message(m), tags(t), arena(a), bytes(a.getSize()) {} const Version getVersion() const { return version.version; } const uint32_t getSubVersion() const { return version.sub; } @@ -64,6 +67,10 @@ struct VersionedMessage { } }; +static bool sameArena(const Arena& a, const Arena& b) { + return a.impl.getPtr() == b.impl.getPtr(); +} + struct BackupData { const UID myId; const Tag tag; // LogRouter tag for this worker, i.e., (-2, i) @@ -84,6 +91,7 @@ struct BackupData { bool stopped = false; bool exitEarly = false; // If the worker is on an old epoch and all backups starts a version >= the endVersion AsyncVar paused; // Track if "backupPausedKey" is set. + Reference lock; struct PerBackupInfo { PerBackupInfo() = default; @@ -231,12 +239,14 @@ struct BackupData { : myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion), endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch), minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1), popVersion(req.startVersion - 1), - cc("BackupWorker", myId.toString()), pulledVersion(0), paused(false) { + cc("BackupWorker", myId.toString()), pulledVersion(0), paused(false), + lock(new FlowLock(SERVER_KNOBS->BACKUP_LOCK_BYTES)) { cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true); specialCounter(cc, "SavedVersion", [this]() { return this->savedVersion; }); specialCounter(cc, "MinKnownCommittedVersion", [this]() { return this->minKnownCommittedVersion; }); specialCounter(cc, "MsgQ", [this]() { return this->messages.size(); }); + specialCounter(cc, "BufferedBytes", [this]() { return this->lock->activePermits(); }); logger = traceCounters("BackupWorkerMetrics", myId, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "BackupWorkerMetrics"); } @@ -310,6 +320,34 @@ struct BackupData { doneTrigger.trigger(); } + // Erases messages and updates lock with memory released. + void eraseMessages(int num) { + ASSERT(num <= messages.size()); + if (num == 0) return; + + if (messages.size() == num) { + messages.clear(); + TraceEvent(SevDebugMemory, "BackupWorkerMemory", myId).detail("ReleaseAll", lock->activePermits()); + lock->release(lock->activePermits()); + return; + } + + // keep track of each arena and accumulate their sizes + int64_t bytes = 0; + for (int i = 0; i < num; i++) { + const Arena& a = messages[i].arena; + const Arena& b = messages[i + 1].arena; + if (!sameArena(a, b)) { + bytes += messages[i].bytes; + TraceEvent(SevDebugMemory, "BackupWorkerMemory", myId) + .detail("Release", messages[i].bytes) + .detail("Arena", (void*)a.impl.getPtr()); + } + } + lock->release(bytes); + messages.erase(messages.begin(), messages.begin() + num); + } + void eraseMessagesAfterEndVersion() { ASSERT(endVersion.present()); const Version ver = endVersion.get(); @@ -637,6 +675,7 @@ ACTOR Future saveMutationsToFile(BackupData* self, Version popVersion, int state std::vector> logFiles; state std::vector blockEnds; state std::vector activeUids; // active Backups' UIDs + state std::vector beginVersions; // logFiles' begin versions state KeyRangeMap> keyRangeMap; // range to index in logFileFutures, logFiles, & blockEnds state std::vector> mutations; state int idx; @@ -655,15 +694,20 @@ ACTOR Future saveMutationsToFile(BackupData* self, Version popVersion, int const int index = logFileFutures.size(); activeUids.push_back(it->first); self->insertRanges(keyRangeMap, it->second.ranges.get(), index); + if (it->second.lastSavedVersion == invalidVersion) { if (it->second.startVersion > self->startVersion && !self->messages.empty()) { // True-up first mutation log's begin version it->second.lastSavedVersion = self->messages[0].getVersion(); } else { - it->second.lastSavedVersion = - std::max(self->popVersion, std::max(self->savedVersion, self->startVersion)); + it->second.lastSavedVersion = std::max({ self->popVersion, self->savedVersion, self->startVersion }); } + TraceEvent("BackupWorkerTrueUp", self->myId).detail("LastSavedVersion", it->second.lastSavedVersion); } + // The true-up version can be larger than first message version, so keep + // the begin versions for later muation filtering. + beginVersions.push_back(it->second.lastSavedVersion); + logFileFutures.push_back(it->second.container.get().get()->writeTaggedLogFile( it->second.lastSavedVersion, popVersion + 1, blockSize, self->tag.id, self->totalTags)); it++; @@ -675,7 +719,7 @@ ACTOR Future saveMutationsToFile(BackupData* self, Version popVersion, int std::transform(logFileFutures.begin(), logFileFutures.end(), std::back_inserter(logFiles), [](const Future>& f) { return f.get(); }); - ASSERT(activeUids.size() == logFiles.size()); + ASSERT(activeUids.size() == logFiles.size() && beginVersions.size() == logFiles.size()); for (int i = 0; i < logFiles.size(); i++) { TraceEvent("OpenMutationFile", self->myId) .detail("BackupID", activeUids[i]) @@ -700,7 +744,10 @@ ACTOR Future saveMutationsToFile(BackupData* self, Version popVersion, int std::vector> adds; if (m.type != MutationRef::Type::ClearRange) { for (int index : keyRangeMap[m.param1]) { - adds.push_back(addMutation(logFiles[index], message, message.message, &blockEnds[index], blockSize)); + if (message.getVersion() >= beginVersions[index]) { + adds.push_back( + addMutation(logFiles[index], message, message.message, &blockEnds[index], blockSize)); + } } } else { KeyRangeRef mutationRange(m.param1, m.param2); @@ -715,8 +762,10 @@ ACTOR Future saveMutationsToFile(BackupData* self, Version popVersion, int wr << subm; mutations.push_back(wr.toValue()); for (int index : range.value()) { - adds.push_back( - addMutation(logFiles[index], message, mutations.back(), &blockEnds[index], blockSize)); + if (message.getVersion() >= beginVersions[index]) { + adds.push_back( + addMutation(logFiles[index], message, mutations.back(), &blockEnds[index], blockSize)); + } } } } @@ -793,12 +842,12 @@ ACTOR Future uploadData(BackupData* self) { .detail("MsgQ", self->messages.size()); // save an empty file for old epochs so that log file versions are continuous wait(saveMutationsToFile(self, popVersion, numMsg)); - self->messages.erase(self->messages.begin(), self->messages.begin() + numMsg); + self->eraseMessages(numMsg); } // If transition into NOOP mode, should clear messages if (!self->pulling) { - self->messages.clear(); + self->eraseMessages(self->messages.size()); } if (popVersion > self->savedVersion && popVersion > self->popVersion) { @@ -812,7 +861,7 @@ ACTOR Future uploadData(BackupData* self) { } if (self->allMessageSaved()) { - self->messages.clear(); + self->eraseMessages(self->messages.size()); return Void(); } @@ -827,6 +876,7 @@ ACTOR Future pullAsyncData(BackupData* self) { state Future logSystemChange = Void(); state Reference r; state Version tagAt = std::max(self->pulledVersion.get(), std::max(self->startVersion, self->savedVersion)); + state Arena prev; TraceEvent("BackupWorkerPull", self->myId); loop { @@ -852,6 +902,15 @@ ACTOR Future pullAsyncData(BackupData* self) { // Note we aggressively peek (uncommitted) messages, but only committed // messages/mutations will be flushed to disk/blob in uploadData(). while (r->hasMessage()) { + if (!sameArena(prev, r->arena())) { + TraceEvent(SevDebugMemory, "BackupWorkerMemory", self->myId) + .detail("Take", r->arena().getSize()) + .detail("Arena", (void*)r->arena().impl.getPtr()) + .detail("Current", self->lock->activePermits()); + + wait(self->lock->take(TaskPriority::DefaultYield, r->arena().getSize())); + prev = r->arena(); + } self->messages.emplace_back(r->version(), r->getMessage(), r->getTags(), r->arena()); r->nextMessage(); } diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index 5cec2a78a0..12fdadf4b3 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -387,7 +387,8 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi init( BACKUP_TIMEOUT, 0.4 ); init( BACKUP_NOOP_POP_DELAY, 5.0 ); init( BACKUP_FILE_BLOCK_BYTES, 1024 * 1024 ); - init( BACKUP_UPLOAD_DELAY, 10.0 ); if( randomize && BUGGIFY ) BACKUP_UPLOAD_DELAY = deterministicRandom()->random01() * 20; // TODO: Increase delay range + init( BACKUP_LOCK_BYTES, 3e9 ); if(randomize && BUGGIFY) BACKUP_LOCK_BYTES = deterministicRandom()->randomInt(1024, 4096) * 1024; + init( BACKUP_UPLOAD_DELAY, 10.0 ); if(randomize && BUGGIFY) BACKUP_UPLOAD_DELAY = deterministicRandom()->random01() * 60; //Cluster Controller init( CLUSTER_CONTROLLER_LOGGING_DELAY, 5.0 ); diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index 0f77188da1..eca7fc15ee 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -313,6 +313,7 @@ public: double BACKUP_TIMEOUT; // master's reaction time for backup failure double BACKUP_NOOP_POP_DELAY; int BACKUP_FILE_BLOCK_BYTES; + int64_t BACKUP_LOCK_BYTES; double BACKUP_UPLOAD_DELAY; //Cluster Controller diff --git a/fdbserver/RestoreLoader.actor.cpp b/fdbserver/RestoreLoader.actor.cpp index d6f6379859..c919e77778 100644 --- a/fdbserver/RestoreLoader.actor.cpp +++ b/fdbserver/RestoreLoader.actor.cpp @@ -216,6 +216,9 @@ ACTOR static Future _parsePartitionedLogFileOnLoader( VersionedMutationsMap::iterator it; bool inserted; std::tie(it, inserted) = kvOps.emplace(msgVersion, MutationsVec()); + // A clear mutation can be split into multiple mutations with the same (version, sub). + // See saveMutationsToFile(). Current tests only use one key range per backup, thus + // only one clear mutation is generated (i.e., always inserted). ASSERT(inserted); ArenaReader rd(buf.arena(), StringRef(message, msgSize), AssumeVersion(currentProtocolVersion)); diff --git a/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp b/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp index 283a0d7210..16794b749f 100644 --- a/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp +++ b/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp @@ -399,9 +399,11 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { if (!self->locked && BUGGIFY) { TraceEvent("BARW_SubmitBackup2", randomID).detail("Tag", printable(self->backupTag)); try { + // Note the "partitionedLog" must be false, because we change + // the configuration to disable backup workers before restore. extraBackup = backupAgent.submitBackup( cx, LiteralStringRef("file://simfdb/backups/"), deterministicRandom()->randomInt(0, 100), - self->backupTag.toString(), self->backupRanges, true, self->usePartitionedLogs); + self->backupTag.toString(), self->backupRanges, true, false); } catch (Error& e) { TraceEvent("BARW_SubmitBackup2Exception", randomID) .error(e)