foundationdb/fdbclient/BackupAgentBase.actor.cpp

1005 lines
40 KiB
C++

/*
* BackupAgentBase.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <iomanip>
#include <time.h>
#include "fdbclient/BackupAgent.actor.h"
#include "fdbrpc/simulator.h"
#include "flow/ActorCollection.h"
#include "flow/actorcompiler.h" // has to be last include
std::string BackupAgentBase::formatTime(int64_t epochs) {
time_t curTime = (time_t)epochs;
char buffer[30];
struct tm timeinfo;
getLocalTime(&curTime, &timeinfo);
strftime(buffer, 30, "%Y/%m/%d.%H:%M:%S%z", &timeinfo);
return buffer;
}
int64_t BackupAgentBase::parseTime(std::string timestamp) {
struct tm out;
out.tm_isdst = -1; // This field is not set by strptime. -1 tells mktime to determine whether DST is in effect
std::string timeOnly = timestamp.substr(0, 19);
// TODO: Use std::get_time implementation for all platforms once supported
// It would be nice to read the timezone using %z, but it seems not all get_time()
// or strptime() implementations handle it correctly in all environments so we
// will read the date and time independent of timezone at first and then adjust it.
#ifdef _WIN32
std::istringstream s(timeOnly);
s.imbue(std::locale(setlocale(LC_TIME, nullptr)));
s >> std::get_time(&out, "%Y/%m/%d.%H:%M:%S");
if (s.fail()) {
return -1;
}
#else
if (strptime(timeOnly.c_str(), "%Y/%m/%d.%H:%M:%S", &out) == nullptr) {
return -1;
}
#endif
// Read timezone offset in +/-HHMM format then convert to seconds
int tzHH;
int tzMM;
if (sscanf(timestamp.substr(19, 5).c_str(), "%3d%2d", &tzHH, &tzMM) != 2) {
return -1;
}
if (tzHH < 0) {
tzMM = -tzMM;
}
// tzOffset is the number of seconds EAST of GMT
int tzOffset = tzHH * 60 * 60 + tzMM * 60;
// The goal is to convert the timestamp string to epoch seconds assuming the date/time was expressed in the timezone
// at the end of the string. However, mktime() will ONLY return epoch seconds assuming the date/time is expressed in
// local time (based on locale / environment) mktime() will set out.tm_gmtoff when available
int64_t ts = mktime(&out);
// localTZOffset is the number of seconds EAST of GMT
long localTZOffset;
#ifdef _WIN32
// _get_timezone() returns the number of seconds WEST of GMT
if (_get_timezone(&localTZOffset) != 0) {
return -1;
}
// Negate offset to match the orientation of tzOffset
localTZOffset = -localTZOffset;
#else
// tm.tm_gmtoff is the number of seconds EAST of GMT
localTZOffset = out.tm_gmtoff;
#endif
// Add back the difference between the local timezone assumed by mktime() and the intended timezone from the input
// string
ts += (localTZOffset - tzOffset);
return ts;
}
const Key BackupAgentBase::keyFolderId = LiteralStringRef("config_folderid");
const Key BackupAgentBase::keyBeginVersion = LiteralStringRef("beginVersion");
const Key BackupAgentBase::keyEndVersion = LiteralStringRef("endVersion");
const Key BackupAgentBase::keyPrevBeginVersion = LiteralStringRef("prevBeginVersion");
const Key BackupAgentBase::keyConfigBackupTag = LiteralStringRef("config_backup_tag");
const Key BackupAgentBase::keyConfigLogUid = LiteralStringRef("config_log_uid");
const Key BackupAgentBase::keyConfigBackupRanges = LiteralStringRef("config_backup_ranges");
const Key BackupAgentBase::keyConfigStopWhenDoneKey = LiteralStringRef("config_stop_when_done");
const Key BackupAgentBase::keyStateStop = LiteralStringRef("state_stop");
const Key BackupAgentBase::keyStateStatus = LiteralStringRef("state_status");
const Key BackupAgentBase::keyLastUid = LiteralStringRef("last_uid");
const Key BackupAgentBase::keyBeginKey = LiteralStringRef("beginKey");
const Key BackupAgentBase::keyEndKey = LiteralStringRef("endKey");
const Key BackupAgentBase::keyDrVersion = LiteralStringRef("drVersion");
const Key BackupAgentBase::destUid = LiteralStringRef("destUid");
const Key BackupAgentBase::backupStartVersion = LiteralStringRef("backupStartVersion");
const Key BackupAgentBase::keyTagName = LiteralStringRef("tagname");
const Key BackupAgentBase::keyStates = LiteralStringRef("state");
const Key BackupAgentBase::keyConfig = LiteralStringRef("config");
const Key BackupAgentBase::keyErrors = LiteralStringRef("errors");
const Key BackupAgentBase::keyRanges = LiteralStringRef("ranges");
const Key BackupAgentBase::keyTasks = LiteralStringRef("tasks");
const Key BackupAgentBase::keyFutures = LiteralStringRef("futures");
const Key BackupAgentBase::keySourceStates = LiteralStringRef("source_states");
const Key BackupAgentBase::keySourceTagName = LiteralStringRef("source_tagname");
bool copyParameter(Reference<Task> source, Reference<Task> dest, Key key) {
if (source) {
dest->params[key] = source->params[key];
return true;
}
return false;
}
Version getVersionFromString(std::string const& value) {
Version version = invalidVersion;
int n = 0;
if (sscanf(value.c_str(), "%lld%n", (long long*)&version, &n) != 1 || n != value.size()) {
TraceEvent(SevWarnAlways, "GetVersionFromString").detail("InvalidVersion", value);
throw restore_invalid_version();
}
return version;
}
// Transaction log data is stored by the FoundationDB core in the
// "backupLogKeys" (i.e., \xff\x02/blog/) keyspace in a funny order for
// performance reasons.
// Returns the ranges of keys that contain the data for the given range
// of versions.
// assert CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE % blocksize = 0. Otherwise calculation of hash will be incorrect
Standalone<VectorRef<KeyRangeRef>> getLogRanges(Version beginVersion,
Version endVersion,
Key destUidValue,
int blockSize) {
Standalone<VectorRef<KeyRangeRef>> ret;
Key baLogRangePrefix = destUidValue.withPrefix(backupLogKeys.begin);
//TraceEvent("GetLogRanges").detail("DestUidValue", destUidValue).detail("Prefix", baLogRangePrefix);
for (int64_t vblock = beginVersion / blockSize; vblock < (endVersion + blockSize - 1) / blockSize; ++vblock) {
int64_t tb = vblock * blockSize / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
uint64_t bv = bigEndian64(std::max(beginVersion, vblock * blockSize));
uint64_t ev = bigEndian64(std::min(endVersion, (vblock + 1) * blockSize));
uint32_t data = tb & 0xffffffff;
uint8_t hash = (uint8_t)hashlittle(&data, sizeof(uint32_t), 0);
Key vblockPrefix = StringRef(&hash, sizeof(uint8_t)).withPrefix(baLogRangePrefix);
ret.push_back_deep(ret.arena(),
KeyRangeRef(StringRef((uint8_t*)&bv, sizeof(uint64_t)).withPrefix(vblockPrefix),
StringRef((uint8_t*)&ev, sizeof(uint64_t)).withPrefix(vblockPrefix)));
}
return ret;
}
Standalone<VectorRef<KeyRangeRef>> getApplyRanges(Version beginVersion, Version endVersion, Key backupUid) {
Standalone<VectorRef<KeyRangeRef>> ret;
Key baLogRangePrefix = backupUid.withPrefix(applyLogKeys.begin);
//TraceEvent("GetLogRanges").detail("BackupUid", backupUid).detail("Prefix", baLogRangePrefix);
for (int64_t vblock = beginVersion / CLIENT_KNOBS->APPLY_BLOCK_SIZE;
vblock < (endVersion + CLIENT_KNOBS->APPLY_BLOCK_SIZE - 1) / CLIENT_KNOBS->APPLY_BLOCK_SIZE;
++vblock) {
int64_t tb = vblock * CLIENT_KNOBS->APPLY_BLOCK_SIZE / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
uint64_t bv = bigEndian64(std::max(beginVersion, vblock * CLIENT_KNOBS->APPLY_BLOCK_SIZE));
uint64_t ev = bigEndian64(std::min(endVersion, (vblock + 1) * CLIENT_KNOBS->APPLY_BLOCK_SIZE));
uint32_t data = tb & 0xffffffff;
uint8_t hash = (uint8_t)hashlittle(&data, sizeof(uint32_t), 0);
Key vblockPrefix = StringRef(&hash, sizeof(uint8_t)).withPrefix(baLogRangePrefix);
ret.push_back_deep(ret.arena(),
KeyRangeRef(StringRef((uint8_t*)&bv, sizeof(uint64_t)).withPrefix(vblockPrefix),
StringRef((uint8_t*)&ev, sizeof(uint64_t)).withPrefix(vblockPrefix)));
}
return ret;
}
Key getApplyKey(Version version, Key backupUid) {
int64_t vblock = (version - 1) / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
uint64_t v = bigEndian64(version);
uint32_t data = vblock & 0xffffffff;
uint8_t hash = (uint8_t)hashlittle(&data, sizeof(uint32_t), 0);
Key k1 = StringRef((uint8_t*)&v, sizeof(uint64_t)).withPrefix(StringRef(&hash, sizeof(uint8_t)));
Key k2 = k1.withPrefix(backupUid);
return k2.withPrefix(applyLogKeys.begin);
}
Version getLogKeyVersion(Key key) {
return bigEndian64(*(int64_t*)(key.begin() + backupLogPrefixBytes + sizeof(UID) + sizeof(uint8_t)));
}
// Given a key from one of the ranges returned by get_log_ranges,
// returns(version, part) where version is the database version number of
// the transaction log data in the value, and part is 0 for the first such
// data for a given version, 1 for the second block of data, etc.
std::pair<Version, uint32_t> decodeBKMutationLogKey(Key key) {
return std::make_pair(
getLogKeyVersion(key),
bigEndian32(*(int32_t*)(key.begin() + backupLogPrefixBytes + sizeof(UID) + sizeof(uint8_t) + sizeof(int64_t))));
}
void decodeBackupLogValue(Arena& arena,
VectorRef<MutationRef>& result,
int& mutationSize,
StringRef value,
StringRef addPrefix,
StringRef removePrefix,
Version version,
Reference<KeyRangeMap<Version>> key_version) {
try {
uint64_t offset(0);
uint64_t protocolVersion = 0;
memcpy(&protocolVersion, value.begin(), sizeof(uint64_t));
offset += sizeof(uint64_t);
if (protocolVersion <= 0x0FDB00A200090001) {
TraceEvent(SevError, "DecodeBackupLogValue")
.detail("IncompatibleProtocolVersion", protocolVersion)
.detail("ValueSize", value.size())
.detail("Value", value);
throw incompatible_protocol_version();
}
uint32_t totalBytes = 0;
memcpy(&totalBytes, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t consumed = 0;
if (totalBytes + offset > value.size())
throw restore_missing_data();
int originalOffset = offset;
while (consumed < totalBytes) {
uint32_t type = 0;
memcpy(&type, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t len1 = 0;
memcpy(&len1, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t len2 = 0;
memcpy(&len2, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
ASSERT(offset + len1 + len2 <= value.size() && isValidMutationType(type));
MutationRef logValue;
Arena tempArena;
logValue.type = type;
logValue.param1 = value.substr(offset, len1);
offset += len1;
logValue.param2 = value.substr(offset, len2);
offset += len2;
if (logValue.type == MutationRef::ClearRange) {
KeyRangeRef range(logValue.param1, logValue.param2);
auto ranges = key_version->intersectingRanges(range);
for (auto r : ranges) {
if (version > r.value() && r.value() != invalidVersion) {
KeyRef minKey = std::min(r.range().end, range.end);
if (minKey == (removePrefix == StringRef() ? normalKeys.end : strinc(removePrefix))) {
logValue.param1 = std::max(r.range().begin, range.begin);
if (removePrefix.size()) {
logValue.param1 = logValue.param1.removePrefix(removePrefix);
}
if (addPrefix.size()) {
logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena);
}
logValue.param2 = addPrefix == StringRef() ? normalKeys.end : strinc(addPrefix, tempArena);
result.push_back_deep(arena, logValue);
mutationSize += logValue.expectedSize();
} else {
logValue.param1 = std::max(r.range().begin, range.begin);
logValue.param2 = minKey;
if (removePrefix.size()) {
logValue.param1 = logValue.param1.removePrefix(removePrefix);
logValue.param2 = logValue.param2.removePrefix(removePrefix);
}
if (addPrefix.size()) {
logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena);
logValue.param2 = logValue.param2.withPrefix(addPrefix, tempArena);
}
result.push_back_deep(arena, logValue);
mutationSize += logValue.expectedSize();
}
}
}
} else {
Version ver = key_version->rangeContaining(logValue.param1).value();
//TraceEvent("ApplyMutation").detail("LogValue", logValue.toString()).detail("Version", version).detail("Ver", ver).detail("Apply", version > ver && ver != invalidVersion);
if (version > ver && ver != invalidVersion) {
if (removePrefix.size()) {
logValue.param1 = logValue.param1.removePrefix(removePrefix);
}
if (addPrefix.size()) {
logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena);
}
result.push_back_deep(arena, logValue);
mutationSize += logValue.expectedSize();
}
}
consumed += BackupAgentBase::logHeaderSize + len1 + len2;
}
ASSERT(consumed == totalBytes);
if (value.size() != offset) {
TraceEvent(SevError, "BA_DecodeBackupLogValue")
.detail("UnexpectedExtraDataSize", value.size())
.detail("Offset", offset)
.detail("TotalBytes", totalBytes)
.detail("Consumed", consumed)
.detail("OriginalOffset", originalOffset);
throw restore_corrupted_data();
}
} catch (Error& e) {
TraceEvent(e.code() == error_code_restore_missing_data ? SevWarn : SevError, "BA_DecodeBackupLogValue")
.error(e)
.GetLastError()
.detail("ValueSize", value.size())
.detail("Value", value);
throw;
}
}
static double lastErrorTime = 0;
void logErrorWorker(Reference<ReadYourWritesTransaction> tr, Key keyErrors, std::string message) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
if (now() - lastErrorTime > CLIENT_KNOBS->BACKUP_ERROR_DELAY) {
TraceEvent("BA_LogError").detail("Key", keyErrors).detail("Message", message);
lastErrorTime = now();
}
tr->set(keyErrors, message);
}
Future<Void> logError(Database cx, Key keyErrors, const std::string& message) {
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr) {
logErrorWorker(tr, keyErrors, message);
return Future<Void>(Void());
});
}
Future<Void> logError(Reference<ReadYourWritesTransaction> tr, Key keyErrors, const std::string& message) {
return logError(tr->getDatabase(), keyErrors, message);
}
ACTOR Future<Void> readCommitted(Database cx,
PromiseStream<RangeResultWithVersion> results,
Reference<FlowLock> lock,
KeyRangeRef range,
bool terminator,
bool systemAccess,
bool lockAware) {
state KeySelector begin = firstGreaterOrEqual(range.begin);
state KeySelector end = firstGreaterOrEqual(range.end);
state Transaction tr(cx);
state FlowLock::Releaser releaser;
loop {
try {
state GetRangeLimits limits(GetRangeLimits::ROW_LIMIT_UNLIMITED,
(g_network->isSimulated() && !g_simulator.speedUpSimulation)
? CLIENT_KNOBS->BACKUP_SIMULATED_LIMIT_BYTES
: CLIENT_KNOBS->BACKUP_GET_RANGE_LIMIT_BYTES);
if (systemAccess)
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (lockAware)
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
// add lock
releaser.release();
wait(lock->take(TaskPriority::DefaultYield,
limits.bytes + CLIENT_KNOBS->VALUE_SIZE_LIMIT + CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT));
releaser = FlowLock::Releaser(
*lock, limits.bytes + CLIENT_KNOBS->VALUE_SIZE_LIMIT + CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
state RangeResult values = wait(tr.getRange(begin, end, limits));
// When this buggify line is enabled, if there are more than 1 result then use half of the results
if (values.size() > 1 && BUGGIFY) {
values.resize(values.arena(), values.size() / 2);
values.more = true;
// Half of the time wait for this tr to expire so that the next read is at a different version
if (deterministicRandom()->random01() < 0.5)
wait(delay(6.0));
}
releaser.remaining -=
values.expectedSize(); // its the responsibility of the caller to release after this point
ASSERT(releaser.remaining >= 0);
results.send(RangeResultWithVersion(values, tr.getReadVersion().get()));
if (values.size() > 0)
begin = firstGreaterThan(values.end()[-1].key);
if (!values.more && !limits.isReached()) {
if (terminator)
results.sendError(end_of_stream());
return Void();
}
} catch (Error& e) {
if (e.code() == error_code_transaction_too_old) {
// We are using this transaction until it's too old and then resetting to a fresh one,
// so we don't need to delay.
tr.fullReset();
} else {
wait(tr.onError(e));
}
}
}
}
ACTOR Future<Void> readCommitted(Database cx,
PromiseStream<RCGroup> results,
Future<Void> active,
Reference<FlowLock> lock,
KeyRangeRef range,
std::function<std::pair<uint64_t, uint32_t>(Key key)> groupBy,
bool terminator,
bool systemAccess,
bool lockAware) {
state KeySelector nextKey = firstGreaterOrEqual(range.begin);
state KeySelector end = firstGreaterOrEqual(range.end);
state RCGroup rcGroup = RCGroup();
state uint64_t skipGroup(ULLONG_MAX);
state Transaction tr(cx);
state FlowLock::Releaser releaser;
loop {
try {
state GetRangeLimits limits(GetRangeLimits::ROW_LIMIT_UNLIMITED,
(g_network->isSimulated() && !g_simulator.speedUpSimulation)
? CLIENT_KNOBS->BACKUP_SIMULATED_LIMIT_BYTES
: CLIENT_KNOBS->BACKUP_GET_RANGE_LIMIT_BYTES);
if (systemAccess)
tr.setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (lockAware)
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
state RangeResult rangevalue = wait(tr.getRange(nextKey, end, limits));
// When this buggify line is enabled, if there are more than 1 result then use half of the results
if (rangevalue.size() > 1 && BUGGIFY) {
rangevalue.resize(rangevalue.arena(), rangevalue.size() / 2);
rangevalue.more = true;
// Half of the time wait for this tr to expire so that the next read is at a different version
if (deterministicRandom()->random01() < 0.5)
wait(delay(6.0));
}
// add lock
wait(active);
releaser.release();
wait(lock->take(TaskPriority::DefaultYield, rangevalue.expectedSize() + rcGroup.items.expectedSize()));
releaser = FlowLock::Releaser(*lock, rangevalue.expectedSize() + rcGroup.items.expectedSize());
for (auto& s : rangevalue) {
uint64_t groupKey = groupBy(s.key).first;
//TraceEvent("Log_ReadCommitted").detail("GroupKey", groupKey).detail("SkipGroup", skipGroup).detail("NextKey", nextKey.key).detail("End", end.key).detail("Valuesize", value.size()).detail("Index",index++).detail("Size",s.value.size());
if (groupKey != skipGroup) {
if (rcGroup.version == -1) {
rcGroup.version = tr.getReadVersion().get();
rcGroup.groupKey = groupKey;
} else if (rcGroup.groupKey != groupKey) {
//TraceEvent("Log_ReadCommitted").detail("SendGroup0", rcGroup.groupKey).detail("ItemSize", rcGroup.items.size()).detail("DataLength",rcGroup.items[0].value.size());
// state uint32_t len(0);
// for (size_t j = 0; j < rcGroup.items.size(); ++j) {
// len += rcGroup.items[j].value.size();
//}
//TraceEvent("SendGroup").detail("GroupKey", rcGroup.groupKey).detail("Version", rcGroup.version).detail("Length", len).detail("Releaser.remaining", releaser.remaining);
releaser.remaining -=
rcGroup.items
.expectedSize(); // its the responsibility of the caller to release after this point
ASSERT(releaser.remaining >= 0);
results.send(rcGroup);
nextKey = firstGreaterThan(rcGroup.items.end()[-1].key);
skipGroup = rcGroup.groupKey;
rcGroup = RCGroup();
rcGroup.version = tr.getReadVersion().get();
rcGroup.groupKey = groupKey;
}
rcGroup.items.push_back_deep(rcGroup.items.arena(), s);
}
}
if (!rangevalue.more) {
if (rcGroup.version != -1) {
releaser.remaining -=
rcGroup.items
.expectedSize(); // its the responsibility of the caller to release after this point
ASSERT(releaser.remaining >= 0);
//TraceEvent("Log_ReadCommitted").detail("SendGroup1", rcGroup.groupKey).detail("ItemSize", rcGroup.items.size()).detail("DataLength", rcGroup.items[0].value.size());
results.send(rcGroup);
}
if (terminator)
results.sendError(end_of_stream());
return Void();
}
nextKey = firstGreaterThan(rangevalue.end()[-1].key);
} catch (Error& e) {
if (e.code() == error_code_transaction_too_old) {
// We are using this transaction until it's too old and then resetting to a fresh one,
// so we don't need to delay.
tr.fullReset();
} else {
wait(tr.onError(e));
}
}
}
}
Future<Void> readCommitted(Database cx,
PromiseStream<RCGroup> results,
Reference<FlowLock> lock,
KeyRangeRef range,
std::function<std::pair<uint64_t, uint32_t>(Key key)> groupBy) {
return readCommitted(cx, results, Void(), lock, range, groupBy, true, true, true);
}
ACTOR Future<int> dumpData(Database cx,
PromiseStream<RCGroup> results,
Reference<FlowLock> lock,
Key uid,
Key addPrefix,
Key removePrefix,
RequestStream<CommitTransactionRequest> commit,
NotifiedVersion* committedVersion,
Optional<Version> endVersion,
Key rangeBegin,
PromiseStream<Future<Void>> addActor,
FlowLock* commitLock,
Reference<KeyRangeMap<Version>> keyVersion) {
state Version lastVersion = invalidVersion;
state bool endOfStream = false;
state int totalBytes = 0;
loop {
state CommitTransactionRequest req;
state Version newBeginVersion = invalidVersion;
state int mutationSize = 0;
loop {
try {
RCGroup group = waitNext(results.getFuture());
lock->release(group.items.expectedSize());
BinaryWriter bw(Unversioned());
for (int i = 0; i < group.items.size(); ++i) {
bw.serializeBytes(group.items[i].value);
}
decodeBackupLogValue(req.arena,
req.transaction.mutations,
mutationSize,
bw.toValue(),
addPrefix,
removePrefix,
group.groupKey,
keyVersion);
newBeginVersion = group.groupKey + 1;
if (mutationSize >= CLIENT_KNOBS->BACKUP_LOG_WRITE_BATCH_MAX_SIZE) {
break;
}
} catch (Error& e) {
if (e.code() == error_code_end_of_stream) {
if (endVersion.present() && endVersion.get() > lastVersion && endVersion.get() > newBeginVersion) {
newBeginVersion = endVersion.get();
}
if (newBeginVersion == invalidVersion)
return totalBytes;
endOfStream = true;
break;
}
throw;
}
}
Key applyBegin = uid.withPrefix(applyMutationsBeginRange.begin);
Key versionKey = BinaryWriter::toValue(newBeginVersion, Unversioned());
Key rangeEnd = getApplyKey(newBeginVersion, uid);
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::SetValue, applyBegin, versionKey));
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(applyBegin));
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::ClearRange, rangeBegin, rangeEnd));
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(rangeBegin));
// The commit request contains no read conflict ranges, so regardless of what read version we
// choose, it's impossible for us to get a transaction_too_old error back, and it's impossible
// for our transaction to be aborted due to conflicts.
req.transaction.read_snapshot = committedVersion->get();
req.flags = req.flags | CommitTransactionRequest::FLAG_IS_LOCK_AWARE;
totalBytes += mutationSize;
wait(commitLock->take(TaskPriority::DefaultYield, mutationSize));
addActor.send(commitLock->releaseWhen(success(commit.getReply(req)), mutationSize));
if (endOfStream) {
return totalBytes;
}
}
}
ACTOR Future<Void> coalesceKeyVersionCache(Key uid,
Version endVersion,
Reference<KeyRangeMap<Version>> keyVersion,
RequestStream<CommitTransactionRequest> commit,
NotifiedVersion* committedVersion,
PromiseStream<Future<Void>> addActor,
FlowLock* commitLock) {
Version lastVersion = -1000;
int64_t removed = 0;
state CommitTransactionRequest req;
state int64_t mutationSize = 0;
Key mapPrefix = uid.withPrefix(applyMutationsKeyVersionMapRange.begin);
for (auto it : keyVersion->ranges()) {
if (lastVersion == -1000) {
lastVersion = it.value();
} else {
Version ver = it.value();
if (ver < endVersion && lastVersion < endVersion && ver != invalidVersion &&
lastVersion != invalidVersion) {
Key removeKey = it.range().begin.withPrefix(mapPrefix);
Key removeEnd = keyAfter(removeKey);
req.transaction.mutations.push_back_deep(req.arena,
MutationRef(MutationRef::ClearRange, removeKey, removeEnd));
mutationSize += removeKey.size() + removeEnd.size();
removed--;
} else {
lastVersion = ver;
}
}
}
if (removed != 0) {
Key countKey = uid.withPrefix(applyMutationsKeyVersionCountRange.begin);
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(countKey));
req.transaction.mutations.push_back_deep(
req.arena, MutationRef(MutationRef::AddValue, countKey, StringRef((uint8_t*)&removed, 8)));
req.transaction.read_snapshot = committedVersion->get();
req.flags = req.flags | CommitTransactionRequest::FLAG_IS_LOCK_AWARE;
wait(commitLock->take(TaskPriority::DefaultYield, mutationSize));
addActor.send(commitLock->releaseWhen(success(commit.getReply(req)), mutationSize));
}
return Void();
}
ACTOR Future<Void> applyMutations(Database cx,
Key uid,
Key addPrefix,
Key removePrefix,
Version beginVersion,
Version* endVersion,
RequestStream<CommitTransactionRequest> commit,
NotifiedVersion* committedVersion,
Reference<KeyRangeMap<Version>> keyVersion) {
state FlowLock commitLock(CLIENT_KNOBS->BACKUP_LOCK_BYTES);
state PromiseStream<Future<Void>> addActor;
state Future<Void> error = actorCollection(addActor.getFuture());
state int maxBytes = CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES;
keyVersion->insert(metadataVersionKey, 0);
try {
loop {
if (beginVersion >= *endVersion) {
wait(commitLock.take(TaskPriority::DefaultYield, CLIENT_KNOBS->BACKUP_LOCK_BYTES));
commitLock.release(CLIENT_KNOBS->BACKUP_LOCK_BYTES);
if (beginVersion >= *endVersion) {
return Void();
}
}
int rangeCount = std::max(1, CLIENT_KNOBS->APPLY_MAX_LOCK_BYTES / maxBytes);
state Version newEndVersion = std::min(*endVersion,
((beginVersion / CLIENT_KNOBS->APPLY_BLOCK_SIZE) + rangeCount) *
CLIENT_KNOBS->APPLY_BLOCK_SIZE);
state Standalone<VectorRef<KeyRangeRef>> ranges = getApplyRanges(beginVersion, newEndVersion, uid);
state size_t idx;
state std::vector<PromiseStream<RCGroup>> results;
state std::vector<Future<Void>> rc;
state std::vector<Reference<FlowLock>> locks;
for (int i = 0; i < ranges.size(); ++i) {
results.push_back(PromiseStream<RCGroup>());
locks.push_back(makeReference<FlowLock>(
std::max(CLIENT_KNOBS->APPLY_MAX_LOCK_BYTES / ranges.size(), CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES)));
rc.push_back(readCommitted(cx, results[i], locks[i], ranges[i], decodeBKMutationLogKey));
}
maxBytes = std::max<int>(maxBytes * CLIENT_KNOBS->APPLY_MAX_DECAY_RATE, CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES);
for (idx = 0; idx < ranges.size(); ++idx) {
int bytes = wait(dumpData(cx,
results[idx],
locks[idx],
uid,
addPrefix,
removePrefix,
commit,
committedVersion,
idx == ranges.size() - 1 ? newEndVersion : Optional<Version>(),
ranges[idx].begin,
addActor,
&commitLock,
keyVersion));
maxBytes = std::max<int>(CLIENT_KNOBS->APPLY_MAX_INCREASE_FACTOR * bytes, maxBytes);
if (error.isError())
throw error.getError();
}
wait(coalesceKeyVersionCache(
uid, newEndVersion, keyVersion, commit, committedVersion, addActor, &commitLock));
beginVersion = newEndVersion;
if (BUGGIFY) {
wait(delay(2.0));
}
}
} catch (Error& e) {
TraceEvent(e.code() == error_code_restore_missing_data ? SevWarnAlways : SevError, "ApplyMutationsError")
.error(e);
throw;
}
}
ACTOR static Future<Void> _eraseLogData(Reference<ReadYourWritesTransaction> tr,
Key logUidValue,
Key destUidValue,
Optional<Version> endVersion,
bool checkBackupUid,
Version backupUid) {
state Key backupLatestVersionsPath = destUidValue.withPrefix(backupLatestVersionsPrefix);
state Key backupLatestVersionsKey = logUidValue.withPrefix(backupLatestVersionsPath);
if (!destUidValue.size()) {
return Void();
}
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (checkBackupUid) {
Subspace sourceStates =
Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keySourceStates).get(logUidValue);
Optional<Value> v = wait(tr->get(sourceStates.pack(DatabaseBackupAgent::keyFolderId)));
if (v.present() && BinaryReader::fromStringRef<Version>(v.get(), Unversioned()) > backupUid)
return Void();
}
state RangeResult backupVersions = wait(
tr->getRange(KeyRangeRef(backupLatestVersionsPath, strinc(backupLatestVersionsPath)), CLIENT_KNOBS->TOO_MANY));
// Make sure version history key does exist and lower the beginVersion if needed
state Version currBeginVersion = invalidVersion;
for (auto backupVersion : backupVersions) {
Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
if (currLogUidValue == logUidValue) {
currBeginVersion = BinaryReader::fromStringRef<Version>(backupVersion.value, Unversioned());
break;
}
}
// Do not clear anything if version history key cannot be found
if (currBeginVersion == invalidVersion) {
return Void();
}
state Version currEndVersion = std::numeric_limits<Version>::max();
if (endVersion.present()) {
currEndVersion = std::min(currEndVersion, endVersion.get());
}
state Version nextSmallestVersion = currEndVersion;
bool clearLogRangesRequired = true;
// More than one backup/DR with the same range
if (backupVersions.size() > 1) {
for (auto backupVersion : backupVersions) {
Key currLogUidValue = backupVersion.key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
Version currVersion = BinaryReader::fromStringRef<Version>(backupVersion.value, Unversioned());
if (currLogUidValue == logUidValue) {
continue;
} else if (currVersion > currBeginVersion) {
nextSmallestVersion = std::min(currVersion, nextSmallestVersion);
} else {
// If we can find a version less than or equal to beginVersion, clearing log ranges is not required
clearLogRangesRequired = false;
break;
}
}
}
if (endVersion.present() || backupVersions.size() != 1 || BUGGIFY) {
if (!endVersion.present()) {
// Clear current backup version history
tr->clear(backupLatestVersionsKey);
if (backupVersions.size() == 1) {
tr->clear(prefixRange(destUidValue.withPrefix(logRangesRange.begin)));
}
} else {
// Update current backup latest version
tr->set(backupLatestVersionsKey, BinaryWriter::toValue<Version>(currEndVersion, Unversioned()));
}
// Clear log ranges if needed
if (clearLogRangesRequired) {
if ((nextSmallestVersion - currBeginVersion) / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE >=
std::numeric_limits<uint8_t>::max() ||
BUGGIFY) {
Key baLogRangePrefix = destUidValue.withPrefix(backupLogKeys.begin);
for (int h = 0; h <= std::numeric_limits<uint8_t>::max(); h++) {
uint64_t bv = bigEndian64(Version(0));
uint64_t ev = bigEndian64(nextSmallestVersion);
uint8_t h1 = h;
Key vblockPrefix = StringRef(&h1, sizeof(uint8_t)).withPrefix(baLogRangePrefix);
tr->clear(KeyRangeRef(StringRef((uint8_t*)&bv, sizeof(uint64_t)).withPrefix(vblockPrefix),
StringRef((uint8_t*)&ev, sizeof(uint64_t)).withPrefix(vblockPrefix)));
}
} else {
Standalone<VectorRef<KeyRangeRef>> ranges =
getLogRanges(currBeginVersion, nextSmallestVersion, destUidValue);
for (auto& range : ranges) {
tr->clear(range);
}
}
}
} else {
// Clear version history
tr->clear(prefixRange(backupLatestVersionsPath));
// Clear everything under blog/[destUid]
tr->clear(prefixRange(destUidValue.withPrefix(backupLogKeys.begin)));
// Disable committing mutations into blog
tr->clear(prefixRange(destUidValue.withPrefix(logRangesRange.begin)));
}
if (!endVersion.present() && backupVersions.size() == 1) {
RangeResult existingDestUidValues =
wait(tr->getRange(KeyRangeRef(destUidLookupPrefix, strinc(destUidLookupPrefix)), CLIENT_KNOBS->TOO_MANY));
for (auto it : existingDestUidValues) {
if (it.value == destUidValue) {
tr->clear(it.key);
}
}
}
return Void();
}
Future<Void> eraseLogData(Reference<ReadYourWritesTransaction> tr,
Key logUidValue,
Key destUidValue,
Optional<Version> endVersion,
bool checkBackupUid,
Version backupUid) {
return _eraseLogData(tr, logUidValue, destUidValue, endVersion, checkBackupUid, backupUid);
}
ACTOR Future<Void> cleanupLogMutations(Database cx, Value destUidValue, bool deleteData) {
state Key backupLatestVersionsPath = destUidValue.withPrefix(backupLatestVersionsPrefix);
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state Optional<Key> removingLogUid;
state std::set<Key> loggedLogUids;
loop {
try {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state RangeResult backupVersions = wait(tr->getRange(
KeyRangeRef(backupLatestVersionsPath, strinc(backupLatestVersionsPath)), CLIENT_KNOBS->TOO_MANY));
state Version readVer = tr->getReadVersion().get();
state Version minVersion = std::numeric_limits<Version>::max();
state Key minVersionLogUid;
state int backupIdx = 0;
for (; backupIdx < backupVersions.size(); backupIdx++) {
state Version currVersion =
BinaryReader::fromStringRef<Version>(backupVersions[backupIdx].value, Unversioned());
state Key currLogUid =
backupVersions[backupIdx].key.removePrefix(backupLatestVersionsPrefix).removePrefix(destUidValue);
if (currVersion < minVersion) {
minVersionLogUid = currLogUid;
minVersion = currVersion;
}
if (!loggedLogUids.count(currLogUid)) {
state Future<Optional<Value>> foundDRKey = tr->get(Subspace(databaseBackupPrefixRange.begin)
.get(BackupAgentBase::keySourceStates)
.get(currLogUid)
.pack(DatabaseBackupAgent::keyStateStatus));
state Future<Optional<Value>> foundBackupKey =
tr->get(Subspace(currLogUid.withPrefix(LiteralStringRef("uid->config/"))
.withPrefix(fileBackupPrefixRange.begin))
.pack(LiteralStringRef("stateEnum")));
wait(success(foundDRKey) && success(foundBackupKey));
if (foundDRKey.get().present() && foundBackupKey.get().present()) {
printf("WARNING: Found a tag that looks like both a backup and a DR. This tag is %.4f hours "
"behind.\n",
(readVer - currVersion) / (3600.0 * CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else if (foundDRKey.get().present() && !foundBackupKey.get().present()) {
printf("Found a DR that is %.4f hours behind.\n",
(readVer - currVersion) / (3600.0 * CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else if (!foundDRKey.get().present() && foundBackupKey.get().present()) {
printf("Found a Backup that is %.4f hours behind.\n",
(readVer - currVersion) / (3600.0 * CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else {
printf("WARNING: Found an unknown tag that is %.4f hours behind.\n",
(readVer - currVersion) / (3600.0 * CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
}
loggedLogUids.insert(currLogUid);
}
}
if (deleteData) {
if (readVer - minVersion > CLIENT_KNOBS->MIN_CLEANUP_SECONDS * CLIENT_KNOBS->CORE_VERSIONSPERSECOND &&
(!removingLogUid.present() || minVersionLogUid == removingLogUid.get())) {
removingLogUid = minVersionLogUid;
wait(eraseLogData(tr, minVersionLogUid, destUidValue));
wait(tr->commit());
printf("\nSuccessfully removed the tag that was %.4f hours behind.\n\n",
(readVer - minVersion) / (3600.0 * CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else if (removingLogUid.present() && minVersionLogUid != removingLogUid.get()) {
printf("\nWARNING: The oldest tag was possibly removed, run again without `--delete_data' to "
"check.\n\n");
} else {
printf("\nWARNING: Did not delete data because the tag is not at least %.4f hours behind. Change "
"`--min_cleanup_seconds' to adjust this threshold.\n\n",
CLIENT_KNOBS->MIN_CLEANUP_SECONDS / 3600.0);
}
} else if (readVer - minVersion >
CLIENT_KNOBS->MIN_CLEANUP_SECONDS * CLIENT_KNOBS->CORE_VERSIONSPERSECOND) {
printf("\nPassing `--delete_data' would delete the tag that is %.4f hours behind.\n\n",
(readVer - minVersion) / (3600.0 * CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
} else {
printf("\nPassing `--delete_data' would not delete the tag that is %.4f hours behind. Change "
"`--min_cleanup_seconds' to adjust the cleanup threshold.\n\n",
(readVer - minVersion) / (3600.0 * CLIENT_KNOBS->CORE_VERSIONSPERSECOND));
}
return Void();
} catch (Error& e) {
wait(tr->onError(e));
}
}
}
ACTOR Future<Void> cleanupBackup(Database cx, bool deleteData) {
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
loop {
try {
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
state RangeResult destUids = wait(
tr->getRange(KeyRangeRef(destUidLookupPrefix, strinc(destUidLookupPrefix)), CLIENT_KNOBS->TOO_MANY));
for (auto destUid : destUids) {
wait(cleanupLogMutations(cx, destUid.value, deleteData));
}
return Void();
} catch (Error& e) {
wait(tr->onError(e));
}
}
}