From 1a35efe43c1ab70c4b636164e911aa8b39611515 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Tue, 21 Apr 2020 15:12:37 -0700 Subject: [PATCH 01/12] Add an assertion: mutation version >= log's begin version This is to check that no version's data are split into two files. --- fdbserver/BackupWorker.actor.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index 58c4f4b682..b25d65eb9c 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -666,6 +666,7 @@ ACTOR Future saveMutationsToFile(BackupData* self, Version popVersion, int } logFileFutures.push_back(it->second.container.get().get()->writeTaggedLogFile( it->second.lastSavedVersion, popVersion + 1, blockSize, self->tag.id, self->totalTags)); + ASSERT(self->messages.empty() || self->messages[0].getVersion() >= it->second.lastSavedVersion); it++; } From 17915e13b0d531b928b2285e3d8b2fbb96f1acb3 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Thu, 14 May 2020 12:05:16 -0700 Subject: [PATCH 02/12] Limit memory usage of backup workers --- fdbserver/BackupWorker.actor.cpp | 36 ++++++++++++++++++++++++++++---- fdbserver/Knobs.cpp | 3 ++- fdbserver/Knobs.h | 1 + 3 files changed, 35 insertions(+), 5 deletions(-) diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index b25d65eb9c..e33ab4b578 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -84,6 +84,7 @@ struct BackupData { bool stopped = false; bool exitEarly = false; // If the worker is on an old epoch and all backups starts a version >= the endVersion AsyncVar paused; // Track if "backupPausedKey" is set. + Reference lock; struct PerBackupInfo { PerBackupInfo() = default; @@ -231,12 +232,14 @@ struct BackupData { : myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion), endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch), minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1), popVersion(req.startVersion - 1), - cc("BackupWorker", myId.toString()), pulledVersion(0), paused(false) { + cc("BackupWorker", myId.toString()), pulledVersion(0), paused(false), + lock(new FlowLock(SERVER_KNOBS->BACKUP_LOCK_BYTES)) { cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true); specialCounter(cc, "SavedVersion", [this]() { return this->savedVersion; }); specialCounter(cc, "MinKnownCommittedVersion", [this]() { return this->minKnownCommittedVersion; }); specialCounter(cc, "MsgQ", [this]() { return this->messages.size(); }); + specialCounter(cc, "BufferedBytes", [this]() { return this->lock->activePermits(); }); logger = traceCounters("BackupWorkerMetrics", myId, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "BackupWorkerMetrics"); } @@ -310,6 +313,30 @@ struct BackupData { doneTrigger.trigger(); } + // Erases messages and updates lock with memory released. + void eraseMessages(int num) { + ASSERT(num <= messages.size()); + if (num == 0) return; + + if (messages.size() == num) { + messages.clear(); + lock->release(); + return; + } + + // keep track of each arena and accumulate their sizes + int64_t bytes = 0; + for (int i = 0; i < num; i++) { + const Arena& a = messages[i].arena; + const Arena& b = messages[i + 1].arena; + if (a.impl.getPtr() != b.impl.getPtr()) { + bytes += a.getSize(); + } + } + lock->release(bytes); + messages.erase(messages.begin(), messages.begin() + num); + } + void eraseMessagesAfterEndVersion() { ASSERT(endVersion.present()); const Version ver = endVersion.get(); @@ -794,12 +821,12 @@ ACTOR Future uploadData(BackupData* self) { .detail("MsgQ", self->messages.size()); // save an empty file for old epochs so that log file versions are continuous wait(saveMutationsToFile(self, popVersion, numMsg)); - self->messages.erase(self->messages.begin(), self->messages.begin() + numMsg); + self->eraseMessages(numMsg); } // If transition into NOOP mode, should clear messages if (!self->pulling) { - self->messages.clear(); + self->eraseMessages(self->messages.size()); } if (popVersion > self->savedVersion && popVersion > self->popVersion) { @@ -813,7 +840,7 @@ ACTOR Future uploadData(BackupData* self) { } if (self->allMessageSaved()) { - self->messages.clear(); + self->eraseMessages(self->messages.size()); return Void(); } @@ -854,6 +881,7 @@ ACTOR Future pullAsyncData(BackupData* self) { // messages/mutations will be flushed to disk/blob in uploadData(). while (r->hasMessage()) { self->messages.emplace_back(r->version(), r->getMessage(), r->getTags(), r->arena()); + wait(self->lock->take(TaskPriority::DefaultYield, r->arena().getSize())); r->nextMessage(); } diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index f1b1b26034..fb11648abf 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -387,7 +387,8 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi init( BACKUP_TIMEOUT, 0.4 ); init( BACKUP_NOOP_POP_DELAY, 5.0 ); init( BACKUP_FILE_BLOCK_BYTES, 1024 * 1024 ); - init( BACKUP_UPLOAD_DELAY, 10.0 ); if( randomize && BUGGIFY ) BACKUP_UPLOAD_DELAY = deterministicRandom()->random01() * 20; // TODO: Increase delay range + init( BACKUP_LOCK_BYTES, 3e9 ); if(randomize && BUGGIFY) BACKUP_LOCK_BYTES = deterministicRandom()->randomInt(4 * 1024, 100 * 1024); + init( BACKUP_UPLOAD_DELAY, 10.0 ); if(randomize && BUGGIFY) BACKUP_UPLOAD_DELAY = deterministicRandom()->random01() * 60; //Cluster Controller init( CLUSTER_CONTROLLER_LOGGING_DELAY, 5.0 ); diff --git a/fdbserver/Knobs.h b/fdbserver/Knobs.h index 0569660c40..3b0433b408 100644 --- a/fdbserver/Knobs.h +++ b/fdbserver/Knobs.h @@ -313,6 +313,7 @@ public: double BACKUP_TIMEOUT; // master's reaction time for backup failure double BACKUP_NOOP_POP_DELAY; int BACKUP_FILE_BLOCK_BYTES; + int64_t BACKUP_LOCK_BYTES; double BACKUP_UPLOAD_DELAY; //Cluster Controller From 01eff0fc0369a27034d1c424f75e218564a98ef1 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Thu, 14 May 2020 19:46:30 -0700 Subject: [PATCH 03/12] Fix memory bytes accounting Avoid duplicated counting of arena memory since messages from peek cursor can share arena. --- fdbserver/BackupWorker.actor.cpp | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index e33ab4b578..28f22338d1 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -64,6 +64,10 @@ struct VersionedMessage { } }; +static bool sameArena(const Arena& a, const Arena& b) { + return a.impl.getPtr() == b.impl.getPtr(); +} + struct BackupData { const UID myId; const Tag tag; // LogRouter tag for this worker, i.e., (-2, i) @@ -320,7 +324,7 @@ struct BackupData { if (messages.size() == num) { messages.clear(); - lock->release(); + lock->release(lock->activePermits()); return; } @@ -329,7 +333,7 @@ struct BackupData { for (int i = 0; i < num; i++) { const Arena& a = messages[i].arena; const Arena& b = messages[i + 1].arena; - if (a.impl.getPtr() != b.impl.getPtr()) { + if (!sameArena(a, b)) { bytes += a.getSize(); } } @@ -693,7 +697,7 @@ ACTOR Future saveMutationsToFile(BackupData* self, Version popVersion, int } logFileFutures.push_back(it->second.container.get().get()->writeTaggedLogFile( it->second.lastSavedVersion, popVersion + 1, blockSize, self->tag.id, self->totalTags)); - ASSERT(self->messages.empty() || self->messages[0].getVersion() >= it->second.lastSavedVersion); + // ASSERT(self->messages.empty() || self->messages[0].getVersion() >= it->second.lastSavedVersion); it++; } @@ -879,9 +883,13 @@ ACTOR Future pullAsyncData(BackupData* self) { // Note we aggressively peek (uncommitted) messages, but only committed // messages/mutations will be flushed to disk/blob in uploadData(). + state Arena prev; while (r->hasMessage()) { self->messages.emplace_back(r->version(), r->getMessage(), r->getTags(), r->arena()); - wait(self->lock->take(TaskPriority::DefaultYield, r->arena().getSize())); + if (!sameArena(prev, r->arena())) { + wait(self->lock->take(TaskPriority::DefaultYield, r->arena().getSize())); + prev = r->arena(); + } r->nextMessage(); } From 89ae1200dd160380180878cdb7bc6685af0cbd4c Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Fri, 15 May 2020 09:45:52 -0700 Subject: [PATCH 04/12] Increase backup worker memory buffer size in simulation Lower limit causes backup worker to become stuck. --- fdbserver/Knobs.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/Knobs.cpp b/fdbserver/Knobs.cpp index fb11648abf..cf9505ac4b 100644 --- a/fdbserver/Knobs.cpp +++ b/fdbserver/Knobs.cpp @@ -387,7 +387,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi init( BACKUP_TIMEOUT, 0.4 ); init( BACKUP_NOOP_POP_DELAY, 5.0 ); init( BACKUP_FILE_BLOCK_BYTES, 1024 * 1024 ); - init( BACKUP_LOCK_BYTES, 3e9 ); if(randomize && BUGGIFY) BACKUP_LOCK_BYTES = deterministicRandom()->randomInt(4 * 1024, 100 * 1024); + init( BACKUP_LOCK_BYTES, 3e9 ); if(randomize && BUGGIFY) BACKUP_LOCK_BYTES = deterministicRandom()->randomInt(1024, 4096) * 1024; init( BACKUP_UPLOAD_DELAY, 10.0 ); if(randomize && BUGGIFY) BACKUP_UPLOAD_DELAY = deterministicRandom()->random01() * 60; //Cluster Controller From caca31d05a9d73e91fe99d9cd1e4e71543cbce61 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Fri, 15 May 2020 20:06:47 -0700 Subject: [PATCH 05/12] Filter out mutations before the true-up version When a mutation log's begin version is true-uped, we must filter out mutations less than such a version. --- fdbserver/BackupWorker.actor.cpp | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index 28f22338d1..58ce7ff3f8 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -668,6 +668,7 @@ ACTOR Future saveMutationsToFile(BackupData* self, Version popVersion, int state std::vector> logFiles; state std::vector blockEnds; state std::vector activeUids; // active Backups' UIDs + state std::vector beginVersions; // logFiles' begin versions state KeyRangeMap> keyRangeMap; // range to index in logFileFutures, logFiles, & blockEnds state std::vector> mutations; state int idx; @@ -686,18 +687,22 @@ ACTOR Future saveMutationsToFile(BackupData* self, Version popVersion, int const int index = logFileFutures.size(); activeUids.push_back(it->first); self->insertRanges(keyRangeMap, it->second.ranges.get(), index); + if (it->second.lastSavedVersion == invalidVersion) { if (it->second.startVersion > self->startVersion && !self->messages.empty()) { // True-up first mutation log's begin version it->second.lastSavedVersion = self->messages[0].getVersion(); } else { - it->second.lastSavedVersion = - std::max(self->popVersion, std::max(self->savedVersion, self->startVersion)); + it->second.lastSavedVersion = std::max({ self->popVersion, self->savedVersion, self->startVersion }); } + TraceEvent("BackupWorkerTrueUp", self->myId).detail("LastSavedVersion", it->second.lastSavedVersion); } + // The true-up version can be larger than first message version, so keep + // the begin versions for later muation filtering. + beginVersions.push_back(it->second.lastSavedVersion); + logFileFutures.push_back(it->second.container.get().get()->writeTaggedLogFile( it->second.lastSavedVersion, popVersion + 1, blockSize, self->tag.id, self->totalTags)); - // ASSERT(self->messages.empty() || self->messages[0].getVersion() >= it->second.lastSavedVersion); it++; } @@ -707,7 +712,7 @@ ACTOR Future saveMutationsToFile(BackupData* self, Version popVersion, int std::transform(logFileFutures.begin(), logFileFutures.end(), std::back_inserter(logFiles), [](const Future>& f) { return f.get(); }); - ASSERT(activeUids.size() == logFiles.size()); + ASSERT(activeUids.size() == logFiles.size() && beginVersions.size() == logFiles.size()); for (int i = 0; i < logFiles.size(); i++) { TraceEvent("OpenMutationFile", self->myId) .detail("BackupID", activeUids[i]) @@ -732,7 +737,10 @@ ACTOR Future saveMutationsToFile(BackupData* self, Version popVersion, int std::vector> adds; if (m.type != MutationRef::Type::ClearRange) { for (int index : keyRangeMap[m.param1]) { - adds.push_back(addMutation(logFiles[index], message, message.message, &blockEnds[index], blockSize)); + if (message.getVersion() >= beginVersions[index]) { + adds.push_back( + addMutation(logFiles[index], message, message.message, &blockEnds[index], blockSize)); + } } } else { KeyRangeRef mutationRange(m.param1, m.param2); @@ -747,8 +755,10 @@ ACTOR Future saveMutationsToFile(BackupData* self, Version popVersion, int wr << subm; mutations.push_back(wr.toValue()); for (int index : range.value()) { - adds.push_back( - addMutation(logFiles[index], message, mutations.back(), &blockEnds[index], blockSize)); + if (message.getVersion() >= beginVersions[index]) { + adds.push_back( + addMutation(logFiles[index], message, mutations.back(), &blockEnds[index], blockSize)); + } } } } From a2e50504922f4bef280ce332614096f1955ac708 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Sat, 16 May 2020 10:52:11 -0700 Subject: [PATCH 06/12] Fix duplicated mutation This seems to be related to how actor compiler generates code. The message can be inserted twice with original code ordering. --- fdbserver/BackupWorker.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index 58ce7ff3f8..cf9feb5f6c 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -895,11 +895,11 @@ ACTOR Future pullAsyncData(BackupData* self) { // messages/mutations will be flushed to disk/blob in uploadData(). state Arena prev; while (r->hasMessage()) { - self->messages.emplace_back(r->version(), r->getMessage(), r->getTags(), r->arena()); if (!sameArena(prev, r->arena())) { wait(self->lock->take(TaskPriority::DefaultYield, r->arena().getSize())); prev = r->arena(); } + self->messages.emplace_back(r->version(), r->getMessage(), r->getTags(), r->arena()); r->nextMessage(); } From 9fbbec10334fb4d08b72890ca2801f3e5e70e24e Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Sat, 16 May 2020 18:50:09 -0700 Subject: [PATCH 07/12] Fix duplicated counting of memory usage For each message from LogPeekCursor, check it's using different arena from the previous one. Otherwise, the arena's memory could be counted twice. --- fdbserver/BackupWorker.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index cf9feb5f6c..67c8cb9868 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -869,6 +869,7 @@ ACTOR Future pullAsyncData(BackupData* self) { state Future logSystemChange = Void(); state Reference r; state Version tagAt = std::max(self->pulledVersion.get(), std::max(self->startVersion, self->savedVersion)); + state Arena prev; TraceEvent("BackupWorkerPull", self->myId); loop { @@ -893,7 +894,6 @@ ACTOR Future pullAsyncData(BackupData* self) { // Note we aggressively peek (uncommitted) messages, but only committed // messages/mutations will be flushed to disk/blob in uploadData(). - state Arena prev; while (r->hasMessage()) { if (!sameArena(prev, r->arena())) { wait(self->lock->take(TaskPriority::DefaultYield, r->arena().getSize())); From 084afbd22d6d108b646ee41637d40207a06f529f Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Sun, 17 May 2020 10:18:30 -0700 Subject: [PATCH 08/12] True-up backup progress may go back multiple epochs Because the previous epoch may not save some tags, true-up backup progress may need to go back more than one epoch. --- fdbserver/BackupProgress.actor.cpp | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/fdbserver/BackupProgress.actor.cpp b/fdbserver/BackupProgress.actor.cpp index 985fcb7f93..ac6f6fef8b 100644 --- a/fdbserver/BackupProgress.actor.cpp +++ b/fdbserver/BackupProgress.actor.cpp @@ -83,21 +83,24 @@ std::map, std::map> BackupProgr auto progressIt = progress.lower_bound(epoch); if (progressIt != progress.end() && progressIt->first == epoch) { - if (progressIt != progress.begin()) { - // Previous epoch is gone, consolidate the progress. + std::set toCheck = tags; + for (auto current = progressIt; current != progress.begin() && !toCheck.empty();) { auto prev = std::prev(progressIt); + // Previous epoch is gone, consolidate the progress. for (auto [tag, version] : prev->second) { - if (tags.count(tag) > 0) { + if (toCheck.count(tag) > 0) { progressIt->second[tag] = std::max(version, progressIt->second[tag]); + toCheck.erase(tag); } } + current = prev; } updateTagVersions(&tagVersions, &tags, progressIt->second, info.epochEnd, adjustedBeginVersion, epoch); } else { auto rit = std::find_if( progress.rbegin(), progress.rend(), [epoch = epoch](const std::pair>& p) { return p.first < epoch; }); - if (!(rit == progress.rend())) { + while (!(rit == progress.rend())) { // A partial recovery can result in empty epoch that copies previous // epoch's version range. In this case, we should check previous // epoch's savedVersion. @@ -112,7 +115,9 @@ std::map, std::map> BackupProgr // ASSERT(info.logRouterTags == epochTags[rit->first]); updateTagVersions(&tagVersions, &tags, rit->second, info.epochEnd, adjustedBeginVersion, epoch); + break; } + rit++; } } From 9e9591a07acd3d2168d967a7d6520a416045c4db Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Tue, 19 May 2020 15:15:39 -0700 Subject: [PATCH 09/12] Fix an infinite loop --- fdbserver/BackupProgress.actor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fdbserver/BackupProgress.actor.cpp b/fdbserver/BackupProgress.actor.cpp index ac6f6fef8b..898ce31b70 100644 --- a/fdbserver/BackupProgress.actor.cpp +++ b/fdbserver/BackupProgress.actor.cpp @@ -85,7 +85,7 @@ std::map, std::map> BackupProgr if (progressIt != progress.end() && progressIt->first == epoch) { std::set toCheck = tags; for (auto current = progressIt; current != progress.begin() && !toCheck.empty();) { - auto prev = std::prev(progressIt); + auto prev = std::prev(current); // Previous epoch is gone, consolidate the progress. for (auto [tag, version] : prev->second) { if (toCheck.count(tag) > 0) { From 4315af25e777f9f8a8f0a07c468ea700f78f106b Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Tue, 19 May 2020 15:18:45 -0700 Subject: [PATCH 10/12] Fix unfinished restore tests Because backup worker is disabled before performing the restore, we can't allow a new backup of the new type running. --- .../workloads/BackupAndParallelRestoreCorrectness.actor.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp b/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp index 283a0d7210..16794b749f 100644 --- a/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp +++ b/fdbserver/workloads/BackupAndParallelRestoreCorrectness.actor.cpp @@ -399,9 +399,11 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload { if (!self->locked && BUGGIFY) { TraceEvent("BARW_SubmitBackup2", randomID).detail("Tag", printable(self->backupTag)); try { + // Note the "partitionedLog" must be false, because we change + // the configuration to disable backup workers before restore. extraBackup = backupAgent.submitBackup( cx, LiteralStringRef("file://simfdb/backups/"), deterministicRandom()->randomInt(0, 100), - self->backupTag.toString(), self->backupRanges, true, self->usePartitionedLogs); + self->backupTag.toString(), self->backupRanges, true, false); } catch (Error& e) { TraceEvent("BARW_SubmitBackup2Exception", randomID) .error(e) From cdeabc4de6539b617f36f2fcc70def6b48bb2ccc Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Wed, 20 May 2020 13:26:57 -0700 Subject: [PATCH 11/12] Fix memory accounting error due to growing Arena After an Arena object is counted, it can grow larger later. So we can't reduce the amount of memory of arena size later. Instead, we use the arena size when inserting mutations. --- fdbserver/BackupWorker.actor.cpp | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/fdbserver/BackupWorker.actor.cpp b/fdbserver/BackupWorker.actor.cpp index 67c8cb9868..430bcc82ca 100644 --- a/fdbserver/BackupWorker.actor.cpp +++ b/fdbserver/BackupWorker.actor.cpp @@ -34,14 +34,17 @@ #include "flow/actorcompiler.h" // This must be the last #include. +#define SevDebugMemory SevVerbose + struct VersionedMessage { LogMessageVersion version; StringRef message; VectorRef tags; Arena arena; // Keep a reference to the memory containing the message + size_t bytes; // arena's size when inserted, which can grow afterwards VersionedMessage(LogMessageVersion v, StringRef m, const VectorRef& t, const Arena& a) - : version(v), message(m), tags(t), arena(a) {} + : version(v), message(m), tags(t), arena(a), bytes(a.getSize()) {} const Version getVersion() const { return version.version; } const uint32_t getSubVersion() const { return version.sub; } @@ -324,6 +327,7 @@ struct BackupData { if (messages.size() == num) { messages.clear(); + TraceEvent(SevDebugMemory, "BackupWorkerMemory", myId).detail("ReleaseAll", lock->activePermits()); lock->release(lock->activePermits()); return; } @@ -334,7 +338,10 @@ struct BackupData { const Arena& a = messages[i].arena; const Arena& b = messages[i + 1].arena; if (!sameArena(a, b)) { - bytes += a.getSize(); + bytes += messages[i].bytes; + TraceEvent(SevDebugMemory, "BackupWorkerMemory", myId) + .detail("Release", messages[i].bytes) + .detail("Arena", (void*)a.impl.getPtr()); } } lock->release(bytes); @@ -896,6 +903,11 @@ ACTOR Future pullAsyncData(BackupData* self) { // messages/mutations will be flushed to disk/blob in uploadData(). while (r->hasMessage()) { if (!sameArena(prev, r->arena())) { + TraceEvent(SevDebugMemory, "BackupWorkerMemory", self->myId) + .detail("Take", r->arena().getSize()) + .detail("Arena", (void*)r->arena().impl.getPtr()) + .detail("Current", self->lock->activePermits()); + wait(self->lock->take(TaskPriority::DefaultYield, r->arena().getSize())); prev = r->arena(); } From 3bf38c1acd154fcd31c25b24da75efbc3b9683e5 Mon Sep 17 00:00:00 2001 From: Jingyu Zhou Date: Wed, 20 May 2020 14:42:30 -0700 Subject: [PATCH 12/12] Add a comment on restore's clear mutation --- fdbserver/RestoreLoader.actor.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/fdbserver/RestoreLoader.actor.cpp b/fdbserver/RestoreLoader.actor.cpp index d6f6379859..c919e77778 100644 --- a/fdbserver/RestoreLoader.actor.cpp +++ b/fdbserver/RestoreLoader.actor.cpp @@ -216,6 +216,9 @@ ACTOR static Future _parsePartitionedLogFileOnLoader( VersionedMutationsMap::iterator it; bool inserted; std::tie(it, inserted) = kvOps.emplace(msgVersion, MutationsVec()); + // A clear mutation can be split into multiple mutations with the same (version, sub). + // See saveMutationsToFile(). Current tests only use one key range per backup, thus + // only one clear mutation is generated (i.e., always inserted). ASSERT(inserted); ArenaReader rd(buf.arena(), StringRef(message, msgSize), AssumeVersion(currentProtocolVersion));