Merge pull request #3172 from jzhou77/backup-fix

Limit memory usage of backup workers
This commit is contained in:
Meng Xu 2020-05-20 15:46:11 -07:00 committed by GitHub
commit c0c15130b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 88 additions and 17 deletions

View File

@ -83,21 +83,24 @@ std::map<std::tuple<LogEpoch, Version, int>, std::map<Tag, Version>> BackupProgr
auto progressIt = progress.lower_bound(epoch);
if (progressIt != progress.end() && progressIt->first == epoch) {
if (progressIt != progress.begin()) {
std::set<Tag> toCheck = tags;
for (auto current = progressIt; current != progress.begin() && !toCheck.empty();) {
auto prev = std::prev(current);
// Previous epoch is gone, consolidate the progress.
auto prev = std::prev(progressIt);
for (auto [tag, version] : prev->second) {
if (tags.count(tag) > 0) {
if (toCheck.count(tag) > 0) {
progressIt->second[tag] = std::max(version, progressIt->second[tag]);
toCheck.erase(tag);
}
}
current = prev;
}
updateTagVersions(&tagVersions, &tags, progressIt->second, info.epochEnd, adjustedBeginVersion, epoch);
} else {
auto rit = std::find_if(
progress.rbegin(), progress.rend(),
[epoch = epoch](const std::pair<LogEpoch, std::map<Tag, Version>>& p) { return p.first < epoch; });
if (!(rit == progress.rend())) {
while (!(rit == progress.rend())) {
// A partial recovery can result in empty epoch that copies previous
// epoch's version range. In this case, we should check previous
// epoch's savedVersion.
@ -112,7 +115,9 @@ std::map<std::tuple<LogEpoch, Version, int>, std::map<Tag, Version>> BackupProgr
// ASSERT(info.logRouterTags == epochTags[rit->first]);
updateTagVersions(&tagVersions, &tags, rit->second, info.epochEnd, adjustedBeginVersion, epoch);
break;
}
rit++;
}
}

View File

@ -34,14 +34,17 @@
#include "flow/actorcompiler.h" // This must be the last #include.
#define SevDebugMemory SevVerbose
struct VersionedMessage {
LogMessageVersion version;
StringRef message;
VectorRef<Tag> tags;
Arena arena; // Keep a reference to the memory containing the message
size_t bytes; // arena's size when inserted, which can grow afterwards
VersionedMessage(LogMessageVersion v, StringRef m, const VectorRef<Tag>& t, const Arena& a)
: version(v), message(m), tags(t), arena(a) {}
: version(v), message(m), tags(t), arena(a), bytes(a.getSize()) {}
const Version getVersion() const { return version.version; }
const uint32_t getSubVersion() const { return version.sub; }
@ -64,6 +67,10 @@ struct VersionedMessage {
}
};
static bool sameArena(const Arena& a, const Arena& b) {
return a.impl.getPtr() == b.impl.getPtr();
}
struct BackupData {
const UID myId;
const Tag tag; // LogRouter tag for this worker, i.e., (-2, i)
@ -84,6 +91,7 @@ struct BackupData {
bool stopped = false;
bool exitEarly = false; // If the worker is on an old epoch and all backups starts a version >= the endVersion
AsyncVar<bool> paused; // Track if "backupPausedKey" is set.
Reference<FlowLock> lock;
struct PerBackupInfo {
PerBackupInfo() = default;
@ -231,12 +239,14 @@ struct BackupData {
: myId(id), tag(req.routerTag), totalTags(req.totalTags), startVersion(req.startVersion),
endVersion(req.endVersion), recruitedEpoch(req.recruitedEpoch), backupEpoch(req.backupEpoch),
minKnownCommittedVersion(invalidVersion), savedVersion(req.startVersion - 1), popVersion(req.startVersion - 1),
cc("BackupWorker", myId.toString()), pulledVersion(0), paused(false) {
cc("BackupWorker", myId.toString()), pulledVersion(0), paused(false),
lock(new FlowLock(SERVER_KNOBS->BACKUP_LOCK_BYTES)) {
cx = openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true);
specialCounter(cc, "SavedVersion", [this]() { return this->savedVersion; });
specialCounter(cc, "MinKnownCommittedVersion", [this]() { return this->minKnownCommittedVersion; });
specialCounter(cc, "MsgQ", [this]() { return this->messages.size(); });
specialCounter(cc, "BufferedBytes", [this]() { return this->lock->activePermits(); });
logger = traceCounters("BackupWorkerMetrics", myId, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc,
"BackupWorkerMetrics");
}
@ -310,6 +320,34 @@ struct BackupData {
doneTrigger.trigger();
}
// Erases messages and updates lock with memory released.
void eraseMessages(int num) {
ASSERT(num <= messages.size());
if (num == 0) return;
if (messages.size() == num) {
messages.clear();
TraceEvent(SevDebugMemory, "BackupWorkerMemory", myId).detail("ReleaseAll", lock->activePermits());
lock->release(lock->activePermits());
return;
}
// keep track of each arena and accumulate their sizes
int64_t bytes = 0;
for (int i = 0; i < num; i++) {
const Arena& a = messages[i].arena;
const Arena& b = messages[i + 1].arena;
if (!sameArena(a, b)) {
bytes += messages[i].bytes;
TraceEvent(SevDebugMemory, "BackupWorkerMemory", myId)
.detail("Release", messages[i].bytes)
.detail("Arena", (void*)a.impl.getPtr());
}
}
lock->release(bytes);
messages.erase(messages.begin(), messages.begin() + num);
}
void eraseMessagesAfterEndVersion() {
ASSERT(endVersion.present());
const Version ver = endVersion.get();
@ -637,6 +675,7 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
state std::vector<Reference<IBackupFile>> logFiles;
state std::vector<int64_t> blockEnds;
state std::vector<UID> activeUids; // active Backups' UIDs
state std::vector<Version> beginVersions; // logFiles' begin versions
state KeyRangeMap<std::set<int>> keyRangeMap; // range to index in logFileFutures, logFiles, & blockEnds
state std::vector<Standalone<StringRef>> mutations;
state int idx;
@ -655,15 +694,20 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
const int index = logFileFutures.size();
activeUids.push_back(it->first);
self->insertRanges(keyRangeMap, it->second.ranges.get(), index);
if (it->second.lastSavedVersion == invalidVersion) {
if (it->second.startVersion > self->startVersion && !self->messages.empty()) {
// True-up first mutation log's begin version
it->second.lastSavedVersion = self->messages[0].getVersion();
} else {
it->second.lastSavedVersion =
std::max(self->popVersion, std::max(self->savedVersion, self->startVersion));
it->second.lastSavedVersion = std::max({ self->popVersion, self->savedVersion, self->startVersion });
}
TraceEvent("BackupWorkerTrueUp", self->myId).detail("LastSavedVersion", it->second.lastSavedVersion);
}
// The true-up version can be larger than first message version, so keep
// the begin versions for later muation filtering.
beginVersions.push_back(it->second.lastSavedVersion);
logFileFutures.push_back(it->second.container.get().get()->writeTaggedLogFile(
it->second.lastSavedVersion, popVersion + 1, blockSize, self->tag.id, self->totalTags));
it++;
@ -675,7 +719,7 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
std::transform(logFileFutures.begin(), logFileFutures.end(), std::back_inserter(logFiles),
[](const Future<Reference<IBackupFile>>& f) { return f.get(); });
ASSERT(activeUids.size() == logFiles.size());
ASSERT(activeUids.size() == logFiles.size() && beginVersions.size() == logFiles.size());
for (int i = 0; i < logFiles.size(); i++) {
TraceEvent("OpenMutationFile", self->myId)
.detail("BackupID", activeUids[i])
@ -700,7 +744,10 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
std::vector<Future<Void>> adds;
if (m.type != MutationRef::Type::ClearRange) {
for (int index : keyRangeMap[m.param1]) {
adds.push_back(addMutation(logFiles[index], message, message.message, &blockEnds[index], blockSize));
if (message.getVersion() >= beginVersions[index]) {
adds.push_back(
addMutation(logFiles[index], message, message.message, &blockEnds[index], blockSize));
}
}
} else {
KeyRangeRef mutationRange(m.param1, m.param2);
@ -715,8 +762,10 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
wr << subm;
mutations.push_back(wr.toValue());
for (int index : range.value()) {
adds.push_back(
addMutation(logFiles[index], message, mutations.back(), &blockEnds[index], blockSize));
if (message.getVersion() >= beginVersions[index]) {
adds.push_back(
addMutation(logFiles[index], message, mutations.back(), &blockEnds[index], blockSize));
}
}
}
}
@ -793,12 +842,12 @@ ACTOR Future<Void> uploadData(BackupData* self) {
.detail("MsgQ", self->messages.size());
// save an empty file for old epochs so that log file versions are continuous
wait(saveMutationsToFile(self, popVersion, numMsg));
self->messages.erase(self->messages.begin(), self->messages.begin() + numMsg);
self->eraseMessages(numMsg);
}
// If transition into NOOP mode, should clear messages
if (!self->pulling) {
self->messages.clear();
self->eraseMessages(self->messages.size());
}
if (popVersion > self->savedVersion && popVersion > self->popVersion) {
@ -812,7 +861,7 @@ ACTOR Future<Void> uploadData(BackupData* self) {
}
if (self->allMessageSaved()) {
self->messages.clear();
self->eraseMessages(self->messages.size());
return Void();
}
@ -827,6 +876,7 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {
state Future<Void> logSystemChange = Void();
state Reference<ILogSystem::IPeekCursor> r;
state Version tagAt = std::max(self->pulledVersion.get(), std::max(self->startVersion, self->savedVersion));
state Arena prev;
TraceEvent("BackupWorkerPull", self->myId);
loop {
@ -852,6 +902,15 @@ ACTOR Future<Void> pullAsyncData(BackupData* self) {
// Note we aggressively peek (uncommitted) messages, but only committed
// messages/mutations will be flushed to disk/blob in uploadData().
while (r->hasMessage()) {
if (!sameArena(prev, r->arena())) {
TraceEvent(SevDebugMemory, "BackupWorkerMemory", self->myId)
.detail("Take", r->arena().getSize())
.detail("Arena", (void*)r->arena().impl.getPtr())
.detail("Current", self->lock->activePermits());
wait(self->lock->take(TaskPriority::DefaultYield, r->arena().getSize()));
prev = r->arena();
}
self->messages.emplace_back(r->version(), r->getMessage(), r->getTags(), r->arena());
r->nextMessage();
}

View File

@ -387,7 +387,8 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( BACKUP_TIMEOUT, 0.4 );
init( BACKUP_NOOP_POP_DELAY, 5.0 );
init( BACKUP_FILE_BLOCK_BYTES, 1024 * 1024 );
init( BACKUP_UPLOAD_DELAY, 10.0 ); if( randomize && BUGGIFY ) BACKUP_UPLOAD_DELAY = deterministicRandom()->random01() * 20; // TODO: Increase delay range
init( BACKUP_LOCK_BYTES, 3e9 ); if(randomize && BUGGIFY) BACKUP_LOCK_BYTES = deterministicRandom()->randomInt(1024, 4096) * 1024;
init( BACKUP_UPLOAD_DELAY, 10.0 ); if(randomize && BUGGIFY) BACKUP_UPLOAD_DELAY = deterministicRandom()->random01() * 60;
//Cluster Controller
init( CLUSTER_CONTROLLER_LOGGING_DELAY, 5.0 );

View File

@ -313,6 +313,7 @@ public:
double BACKUP_TIMEOUT; // master's reaction time for backup failure
double BACKUP_NOOP_POP_DELAY;
int BACKUP_FILE_BLOCK_BYTES;
int64_t BACKUP_LOCK_BYTES;
double BACKUP_UPLOAD_DELAY;
//Cluster Controller

View File

@ -216,6 +216,9 @@ ACTOR static Future<Void> _parsePartitionedLogFileOnLoader(
VersionedMutationsMap::iterator it;
bool inserted;
std::tie(it, inserted) = kvOps.emplace(msgVersion, MutationsVec());
// A clear mutation can be split into multiple mutations with the same (version, sub).
// See saveMutationsToFile(). Current tests only use one key range per backup, thus
// only one clear mutation is generated (i.e., always inserted).
ASSERT(inserted);
ArenaReader rd(buf.arena(), StringRef(message, msgSize), AssumeVersion(currentProtocolVersion));

View File

@ -399,9 +399,11 @@ struct BackupAndParallelRestoreCorrectnessWorkload : TestWorkload {
if (!self->locked && BUGGIFY) {
TraceEvent("BARW_SubmitBackup2", randomID).detail("Tag", printable(self->backupTag));
try {
// Note the "partitionedLog" must be false, because we change
// the configuration to disable backup workers before restore.
extraBackup = backupAgent.submitBackup(
cx, LiteralStringRef("file://simfdb/backups/"), deterministicRandom()->randomInt(0, 100),
self->backupTag.toString(), self->backupRanges, true, self->usePartitionedLogs);
self->backupTag.toString(), self->backupRanges, true, false);
} catch (Error& e) {
TraceEvent("BARW_SubmitBackup2Exception", randomID)
.error(e)