Increased CopyLogRange performance

This commit is contained in:
Oleg Samarin 2020-06-16 13:59:47 +03:00
parent 2105778026
commit d887d5dd70
5 changed files with 118 additions and 18 deletions

View File

@ -495,6 +495,7 @@ Standalone<VectorRef<KeyRangeRef>> getLogRanges(Version beginVersion, Version en
Standalone<VectorRef<KeyRangeRef>> getApplyRanges(Version beginVersion, Version endVersion, Key backupUid);
Future<Void> eraseLogData(Reference<ReadYourWritesTransaction> tr, Key logUidValue, Key destUidValue, Optional<Version> endVersion = Optional<Version>(), bool checkBackupUid = false, Version backupUid = 0);
Key getApplyKey( Version version, Key backupUid );
Version getLogKeyVersion(Key key);
std::pair<Version, uint32_t> decodeBKMutationLogKey(Key key);
Future<Void> logError(Database cx, Key keyErrors, const std::string& message);
Future<Void> logError(Reference<ReadYourWritesTransaction> tr, Key keyErrors, const std::string& message);

View File

@ -144,6 +144,7 @@ Version getVersionFromString(std::string const& value) {
// \xff / bklog / keyspace in a funny order for performance reasons.
// Return the ranges of keys that contain the data for the given range
// of versions.
// assert CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE % blocksize = 0. Otherwise calculation of hash will be incorrect
Standalone<VectorRef<KeyRangeRef>> getLogRanges(Version beginVersion, Version endVersion, Key destUidValue, int blockSize) {
Standalone<VectorRef<KeyRangeRef>> ret;
@ -200,12 +201,16 @@ Key getApplyKey( Version version, Key backupUid ) {
return k2.withPrefix(applyLogKeys.begin);
}
Version getLogKeyVersion(Key key) {
return bigEndian64(*(int64_t*)(key.begin() + backupLogPrefixBytes + sizeof(UID) + sizeof(uint8_t)));
}
//Given a key from one of the ranges returned by get_log_ranges,
//returns(version, part) where version is the database version number of
//the transaction log data in the value, and part is 0 for the first such
//data for a given version, 1 for the second block of data, etc.
std::pair<Version, uint32_t> decodeBKMutationLogKey(Key key) {
return std::make_pair(bigEndian64(*(int64_t*)(key.begin() + backupLogPrefixBytes + sizeof(UID) + sizeof(uint8_t))),
return std::make_pair(getLogKeyVersion(key),
bigEndian32(*(int32_t*)(key.begin() + backupLogPrefixBytes + sizeof(UID) + sizeof(uint8_t) + sizeof(int64_t))));
}

View File

@ -33,6 +33,8 @@
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/KeyBackedTypes.h"
#include "flow/actorcompiler.h" // has to be last include
#include <inttypes.h>
#include <map>
const Key DatabaseBackupAgent::keyAddPrefix = LiteralStringRef("add_prefix");
const Key DatabaseBackupAgent::keyRemovePrefix = LiteralStringRef("remove_prefix");
@ -539,16 +541,25 @@ namespace dbBackup {
Future<Void> execute(Database cx, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _execute(cx, tb, fb, task); };
Future<Void> finish(Reference<ReadYourWritesTransaction> tr, Reference<TaskBucket> tb, Reference<FutureBucket> fb, Reference<Task> task) { return _finish(tr, tb, fb, task); };
ACTOR static Future<Void> dumpData(Database cx, Reference<Task> task, PromiseStream<RCGroup> results, FlowLock* lock, Reference<TaskBucket> tb) {
// store mutation data from results until the end of stream or the timeout. If breaks on timeout returns the first uncopied version
ACTOR static Future<Optional<Version>> dumpData(
Database cx, Reference<Task> task,
PromiseStream<RCGroup> results,
FlowLock* lock,
Reference<TaskBucket> tb,
double breakTime
) {
state bool endOfStream = false;
state Subspace conf = Subspace(databaseBackupPrefixRange.begin).get(BackupAgentBase::keyConfig).get(task->params[BackupAgentBase::keyConfigLogUid]);
state std::vector<Standalone<RangeResultRef>> nextMutations;
state bool isTimeoutOccured = false;
state Optional<KeyRef> lastKey;
state Version lastVersion;
state int64_t nextMutationSize = 0;
loop{
try {
if (endOfStream && !nextMutationSize) {
return Void();
return Optional<Version>();
}
state std::vector<Standalone<RangeResultRef>> mutations = std::move(nextMutations);
@ -585,8 +596,9 @@ namespace dbBackup {
}
}
state Optional<Version> nextVersionAfterBreak;
state Transaction tr(cx);
loop{
try {
tr.setOption(FDBTransactionOptions::LOCK_AWARE);
@ -597,12 +609,22 @@ namespace dbBackup {
bool first = true;
for(auto m : mutations) {
for(auto kv : m) {
if (isTimeoutOccured) {
Version newVersion = getLogKeyVersion(kv.key);
if (newVersion > lastVersion) {
nextVersionAfterBreak = newVersion;
break;
}
}
if(first) {
tr.addReadConflictRange(singleKeyRange(kv.key));
first = false;
TraceEvent(SevInfo, "debug.CopyLogRangeTaskFunc::dumpData.50").detail("store", kv.key.removePrefix(backupLogKeys.begin).removePrefix(task->params[BackupAgentBase::destUid]).withPrefix(task->params[BackupAgentBase::keyConfigLogUid]).withPrefix(applyLogKeys.begin));
}
tr.set(kv.key.removePrefix(backupLogKeys.begin).removePrefix(task->params[BackupAgentBase::destUid]).withPrefix(task->params[BackupAgentBase::keyConfigLogUid]).withPrefix(applyLogKeys.begin), kv.value);
bytesSet += kv.expectedSize() - backupLogKeys.begin.expectedSize() + applyLogKeys.begin.expectedSize();
lastKey = kv.key;
}
}
@ -614,6 +636,18 @@ namespace dbBackup {
wait(tr.onError(e));
}
}
if (nextVersionAfterBreak.present()) {
return nextVersionAfterBreak;
}
if (! isTimeoutOccured && timer_monotonic() >= breakTime && lastKey.present()) {
// timeout occured
// continue to copy mutations with the
// same version before break because
// the will filter them out basing on
// the version condition on the next run
lastVersion = getLogKeyVersion(lastKey.get());
isTimeoutOccured = true;
}
}
catch (Error &e) {
if (e.code() == error_code_actor_cancelled || e.code() == error_code_backup_error)
@ -628,31 +662,83 @@ namespace dbBackup {
}
ACTOR static Future<Void> _execute(Database cx, Reference<TaskBucket> taskBucket, Reference<FutureBucket> futureBucket, Reference<Task> task) {
state Reference<FlowLock> lock(new FlowLock(CLIENT_KNOBS->BACKUP_LOCK_BYTES));
// state Reference<FlowLock> lock(new FlowLock(CLIENT_KNOBS->BACKUP_LOCK_BYTES));
wait(checkTaskVersion(cx, task, CopyLogRangeTaskFunc::name, CopyLogRangeTaskFunc::version));
state Version beginVersion = BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyBeginVersion], Unversioned());
state Version endVersion = BinaryReader::fromStringRef<Version>(task->params[DatabaseBackupAgent::keyEndVersion], Unversioned());
state Version newEndVersion = std::min(endVersion, (((beginVersion-1) / CLIENT_KNOBS->BACKUP_BLOCK_SIZE) + 2 + (g_network->isSimulated() ? CLIENT_KNOBS->BACKUP_SIM_COPY_LOG_RANGES : 0)) * CLIENT_KNOBS->BACKUP_BLOCK_SIZE);
Version newEndVersion = std::min(
endVersion,
(
((beginVersion-1) / CLIENT_KNOBS->COPY_LOG_BLOCK_SIZE) + 1
+ CLIENT_KNOBS->COPY_LOG_BLOCKS_PER_TASK
+ (g_network->isSimulated() ? CLIENT_KNOBS->BACKUP_SIM_COPY_LOG_RANGES : 0)
) * CLIENT_KNOBS->COPY_LOG_BLOCK_SIZE
);
state Standalone<VectorRef<KeyRangeRef>> ranges = getLogRanges(beginVersion, newEndVersion, task->params[BackupAgentBase::destUid], CLIENT_KNOBS->COPY_LOG_BLOCK_SIZE);
state int nRanges = ranges.size();
TraceEvent(SevInfo, "debug.CopyLogRangeTaskFunc::_execute.10").detail("beginVersion", beginVersion).detail("endVersion", endVersion).detail("newEndVersion", newEndVersion).detail("nRanges", nRanges);
state Standalone<VectorRef<KeyRangeRef>> ranges = getLogRanges(beginVersion, newEndVersion, task->params[BackupAgentBase::destUid], CLIENT_KNOBS->BACKUP_BLOCK_SIZE);
state std::vector<PromiseStream<RCGroup>> results;
state std::vector<Future<Void>> rc;
state std::vector<Future<Void>> dump;
state std::vector<Reference<FlowLock>> locks;
state Version nextVersion = beginVersion;
state double breakTime = timer_monotonic() + CLIENT_KNOBS->COPY_LOG_TASK_DURATION_NANOS;
state int rangeN = 0;
loop {
if (rangeN >= nRanges)
break;
for (int i = 0; i < ranges.size(); ++i) {
results.push_back(PromiseStream<RCGroup>());
rc.push_back(readCommitted(taskBucket->src, results[i], Future<Void>(Void()), lock, ranges[i], decodeBKMutationLogKey, true, true, true));
dump.push_back(dumpData(cx, task, results[i], lock.getPtr(), taskBucket));
// prefetch
int prefetchTo = std::min(rangeN + CLIENT_KNOBS->COPY_LOG_PREFETCH_BLOCKS, nRanges);
for (int j = results.size(); j < prefetchTo; j ++) {
results.push_back(PromiseStream<RCGroup>());
locks.push_back(Reference<FlowLock>(new FlowLock(CLIENT_KNOBS->BACKUP_LOCK_BYTES)));
rc.push_back(readCommitted(taskBucket->src, results[j], Future<Void>(Void()), locks[j], ranges[j], decodeBKMutationLogKey, true, true, true));
}
// copy the range
Optional<Version> nextVersionBr = wait(
dumpData(
cx, task, results[rangeN], locks[rangeN].getPtr(), taskBucket, breakTime
)
);
// exit from the task if a timeout occures
if (nextVersionBr.present()) {
nextVersion = nextVersionBr.get();
// cancel prefetch
TraceEvent(
SevInfo,
"CopyLogRangeTaskFunc is broken on timeout")
.detail("durationNanos", CLIENT_KNOBS->COPY_LOG_TASK_DURATION_NANOS)
.detail("rangeN", rangeN)
.detail("bytesWritten", Params.bytesWritten().getOrDefault(task)
);
for (int j = results.size(); --j >= rangeN;)
rc[j].cancel();
break;
}
// the whole range has been dumped
nextVersion = getLogKeyVersion(ranges[rangeN].end);
rangeN ++;
}
TraceEvent(SevInfo, "debug.CopyLogRangeTaskFunc::_execute.20")
.detail("beginVersion", beginVersion)
.detail("endVersion", endVersion)
.detail("nextVersion", nextVersion)
.detail("bytesWritten", Params.bytesWritten().getOrDefault(task));
wait(waitForAll(dump));
if (newEndVersion < endVersion) {
task->params[CopyLogRangeTaskFunc::keyNextBeginVersion] = BinaryWriter::toValue(newEndVersion, Unversioned());
if (nextVersion < endVersion) {
task->params[CopyLogRangeTaskFunc::keyNextBeginVersion] = BinaryWriter::toValue(nextVersion, Unversioned());
}
return Void();
}

View File

@ -144,6 +144,10 @@ void ClientKnobs::initialize(bool randomize) {
init( BACKUP_MAP_KEY_UPPER_LIMIT, 1e5 ); if( buggifyMapLimits ) BACKUP_MAP_KEY_UPPER_LIMIT = 30;
init( BACKUP_COPY_TASKS, 90 );
init( BACKUP_BLOCK_SIZE, LOG_RANGE_BLOCK_SIZE/10 );
init( COPY_LOG_BLOCK_SIZE, LOG_RANGE_BLOCK_SIZE ); // the maximum possible value due the getLogRanges limitations
init( COPY_LOG_BLOCKS_PER_TASK, 1000 );
init( COPY_LOG_PREFETCH_BLOCKS, 3 );
init( COPY_LOG_TASK_DURATION_NANOS, 1e10 );
init( BACKUP_TASKS_PER_AGENT, 10 );
init( BACKUP_POLL_PROGRESS_SECONDS, 10 );
init( VERSIONS_PER_SECOND, 1e6 ); // Must be the same as SERVER_KNOBS->VERSIONS_PER_SECOND

View File

@ -139,6 +139,10 @@ public:
int BACKUP_MAP_KEY_UPPER_LIMIT;
int BACKUP_COPY_TASKS;
int BACKUP_BLOCK_SIZE;
int COPY_LOG_BLOCK_SIZE;
int COPY_LOG_BLOCKS_PER_TASK;
int COPY_LOG_PREFETCH_BLOCKS;
double COPY_LOG_TASK_DURATION_NANOS;
int BACKUP_TASKS_PER_AGENT;
int BACKUP_POLL_PROGRESS_SECONDS;
int64_t VERSIONS_PER_SECOND; // Copy of SERVER_KNOBS, as we can't link with it