Add mutations state variable to hold on to memory

This commit is contained in:
Jingyu Zhou 2020-02-12 10:02:27 -08:00
parent a13d4e9bb6
commit 237f0c35cd
1 changed files with 47 additions and 74 deletions

View File

@ -44,6 +44,25 @@ struct VersionedMessage {
: version(v), message(m), tags(t), arena(a) {}
const Version getVersion() const { return version.version; }
const uint32_t getSubVersion() const { return version.sub; }
// Returns true if the message is a mutation that should be backuped, i.e.,
// either key is not in system key space or is not a metadataVersionKey.
bool isBackupMessage(MutationRef* m) const {
for (Tag tag : tags) {
if (tag.locality == tagLocalitySpecial || tag.locality == tagLocalityTxs) {
return false; // skip Txs mutations
}
}
BinaryReader reader(message.begin(), message.size(), AssumeVersion(currentProtocolVersion));
// Return false for LogProtocolMessage.
if (LogProtocolMessage::isNextIn(reader)) return false;
reader >> *m;
TraceEvent("KeyDebug").detail("M", m->toString());
return normalKeys.contains(m->param1) || m->param1 == metadataVersionKey;
}
};
struct BackupData {
@ -317,30 +336,6 @@ ACTOR Future<Void> saveProgress(BackupData* self, Version backupVersion) {
}
}
// Returns true if the message is a mutation that should be backuped, i.e.,
// either key is not in system key space or is not a metadataVersionKey.
bool isBackupMessage(const VersionedMessage& msg, MutationRef* m) {
for (Tag tag : msg.tags) {
if (tag.locality == tagLocalitySpecial || tag.locality == tagLocalityTxs) {
return false; // skip Txs mutations
}
}
BinaryReader reader(msg.message.begin(), msg.message.size(), AssumeVersion(currentProtocolVersion));
// Return false for LogProtocolMessage.
if (LogProtocolMessage::isNextIn(reader)) return false;
reader >> *m;
// check for metadataVersionKey and special metadata mutations
if (!normalKeys.contains(m->param1) && m->param1 != metadataVersionKey) {
return false;
}
return true;
}
// Return a block of contiguous padding bytes, growing if needed.
static Value makePadding(int size) {
static Value pad;
@ -352,28 +347,19 @@ static Value makePadding(int size) {
return pad.substr(0, size);
}
ACTOR Future<Void> addMutation(BackupData* self, Reference<IBackupFile> logFile, int idx, MutationRef m,
int64_t* blockEnd, int blockSize, bool useM) {
state Standalone<StringRef> keep;
state StringRef message;
if (useM) {
BinaryWriter wr(AssumeVersion(currentProtocolVersion));
wr << m;
keep = wr.toValue();
message = keep;
} else {
message = self->messages[idx].message;
}
state int bytes = sizeof(Version) + sizeof(uint32_t) + sizeof(int) + message.size();
// Write a mutation to a log file. Note the mutation can be different from
// message.message for clear mutations.
ACTOR Future<Void> addMutation(Reference<IBackupFile> logFile, VersionedMessage message, StringRef mutation,
int64_t* blockEnd, int blockSize) {
state int bytes = sizeof(Version) + sizeof(uint32_t) + sizeof(int) + mutation.size();
// Convert to big Endianness for version.version, version.sub, and msgSize
// The decoder assumes 0xFF is the end, so little endian can easily be
// mistaken as the end. In contrast, big endian for version almost guarantee
// the first byte is not 0xFF (should always be 0x00).
BinaryWriter wr(Unversioned());
wr << bigEndian64(self->messages[idx].version.version) << bigEndian32(self->messages[idx].version.sub)
<< bigEndian32(message.size());
wr << bigEndian64(message.version.version) << bigEndian32(message.version.sub)
<< bigEndian32(mutation.size());
state Standalone<StringRef> header = wr.toValue();
// Start a new block if needed
@ -390,7 +376,7 @@ ACTOR Future<Void> addMutation(BackupData* self, Reference<IBackupFile> logFile,
}
wait(logFile->append((void*)header.begin(), header.size()));
wait(logFile->append(message.begin(), message.size()));
wait(logFile->append(mutation.begin(), mutation.size()));
return Void();
}
@ -404,7 +390,8 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
state std::vector<int64_t> blockEnds;
state std::set<UID> activeUids; // active Backups' UIDs
state KeyRangeMap<std::set<int>> keyRangeMap; // range to index in logFileFutures, logFiles, & blockEnds
state MutationRef m;
state std::vector<Standalone<StringRef>> mutations;
state int idx;
for (auto it = self->backups.begin(); it != self->backups.end();) {
if (!it->second.isRunning()) {
@ -425,6 +412,12 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
it->second.lastSavedVersion, popVersion + 1, blockSize, self->tag.id));
it++;
}
if (activeUids.empty()) {
// stop early if there is no active backups
self->messages.erase(self->messages.begin(), self->messages.begin() + numMsg);
TraceEvent("BackupWorkerSkip", self->myId).detail("Count", numMsg);
return Void();
}
keyRangeMap.coalesce(allKeys);
wait(waitForAll(logFileFutures));
@ -437,31 +430,37 @@ ACTOR Future<Void> saveMutationsToFile(BackupData* self, Version popVersion, int
.detail("File", file->getFileName());
}
state int idx = 0;
blockEnds = std::vector<int64_t>(logFiles.size(), 0);
for (; idx < numMsg; idx++) {
if (!isBackupMessage(self->messages[idx], &m)) continue;
for (idx = 0; idx < numMsg; idx++) {
auto& message = self->messages[idx];
MutationRef m;
if (!message.isBackupMessage(&m)) continue;
std::vector<Future<Void>> adds;
if (m.type != MutationRef::Type::ClearRange) {
for (int index : keyRangeMap[m.param1]) {
adds.push_back(addMutation(self, logFiles[index], idx, m, &blockEnds[index], blockSize, false));
adds.push_back(addMutation(logFiles[index], message, message.message, &blockEnds[index], blockSize));
}
} else {
KeyRangeRef mutationRange(m.param1, m.param2);
KeyRangeRef intersectionRange;
KeyRange intersectionRange;
// Find intersection ranges and create mutations for sub-ranges
for (auto range : keyRangeMap.intersectingRanges(mutationRange)) {
const auto& subrange = range.range();
intersectionRange = mutationRange & subrange;
MutationRef subm(MutationRef::Type::ClearRange, intersectionRange.begin, intersectionRange.end);
BinaryWriter wr(AssumeVersion(currentProtocolVersion));
wr << subm;
mutations.push_back(wr.toValue());
for (int index : range.value()) {
adds.push_back(addMutation(self, logFiles[index], idx, subm, &blockEnds[index], blockSize, true));
adds.push_back(
addMutation(logFiles[index], message, mutations.back(), &blockEnds[index], blockSize));
}
}
}
wait(waitForAll(adds));
mutations.clear();
}
self->messages.erase(self->messages.begin(), self->messages.begin() + numMsg);
@ -692,29 +691,3 @@ ACTOR Future<Void> backupWorker(BackupInterface interf, InitializeBackupRequest
return Void();
}
#include "flow/UnitTest.h"
TEST_CASE("/BackupWorker/Range") {
KeyRangeMap<std::set<int>> rangeMap;
for (int index : rangeMap[LiteralStringRef("1")]) {
ASSERT(false);
printf("index %d\n", index);
}
for (auto& logRange : rangeMap.modify(normalKeys)) {
logRange->value().insert(1);
}
for (auto& logRange : rangeMap.modify(singleKeyRange(metadataVersionKey))) {
logRange->value().insert(1);
}
for (auto& logRange : rangeMap.modify(KeyRange(KeyRangeRef(LiteralStringRef("0"), LiteralStringRef("5"))))) {
logRange->value().insert(2);
}
// rangeMap.coalesce(allKeys);
for (int index : rangeMap[LiteralStringRef("1")]) {
printf("index %d\n", index);
}
return Void();
}