foundationdb/fdbclient/BackupAgentBase.actor.cpp

705 lines
30 KiB
C++
Raw Normal View History

2017-05-26 04:48:44 +08:00
/*
* BackupAgentBase.actor.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "BackupAgent.h"
#include "fdbrpc/simulator.h"
#include "flow/ActorCollection.h"
const Key BackupAgentBase::keyFolderId = LiteralStringRef("config_folderid");
const Key BackupAgentBase::keyBeginVersion = LiteralStringRef("beginVersion");
const Key BackupAgentBase::keyEndVersion = LiteralStringRef("endVersion");
const Key BackupAgentBase::keyConfigBackupTag = LiteralStringRef("config_backup_tag");
const Key BackupAgentBase::keyConfigLogUid = LiteralStringRef("config_log_uid");
const Key BackupAgentBase::keyConfigBackupRanges = LiteralStringRef("config_backup_ranges");
const Key BackupAgentBase::keyConfigStopWhenDoneKey = LiteralStringRef("config_stop_when_done");
const Key BackupAgentBase::keyStateStop = LiteralStringRef("state_stop");
const Key BackupAgentBase::keyStateStatus = LiteralStringRef("state_status");
const Key BackupAgentBase::keyLastUid = LiteralStringRef("last_uid");
const Key BackupAgentBase::keyBeginKey = LiteralStringRef("beginKey");
const Key BackupAgentBase::keyEndKey = LiteralStringRef("endKey");
const Key BackupAgentBase::keyTagName = LiteralStringRef("tagname");
const Key BackupAgentBase::keyStates = LiteralStringRef("state");
const Key BackupAgentBase::keyConfig = LiteralStringRef("config");
const Key BackupAgentBase::keyErrors = LiteralStringRef("errors");
const Key BackupAgentBase::keyRanges = LiteralStringRef("ranges");
const Key BackupAgentBase::keyTasks = LiteralStringRef("tasks");
const Key BackupAgentBase::keyFutures = LiteralStringRef("futures");
const Key BackupAgentBase::keySourceStates = LiteralStringRef("source_states");
const Key BackupAgentBase::keySourceTagName = LiteralStringRef("source_tagname");
bool copyParameter(Reference<Task> source, Reference<Task> dest, Key key) {
if (source) {
dest->params[key] = source->params[key];
return true;
}
return false;
}
Version getVersionFromString(std::string const& value) {
Version version(-1);
int n = 0;
if (sscanf(value.c_str(), "%lld%n", (long long*)&version, &n) != 1 || n != value.size()) {
TraceEvent(SevWarnAlways, "getVersionFromString").detail("InvalidVersion", value);
throw restore_invalid_version();
}
return version;
}
// Transaction log data is stored by the FoundationDB core in the
// \xff / bklog / keyspace in a funny order for performance reasons.
// Return the ranges of keys that contain the data for the given range
// of versions.
Standalone<VectorRef<KeyRangeRef>> getLogRanges(Version beginVersion, Version endVersion, Key backupUid, int blockSize) {
Standalone<VectorRef<KeyRangeRef>> ret;
Key baLogRangePrefix = backupUid.withPrefix(backupLogKeys.begin);
//TraceEvent("getLogRanges").detail("backupUid", backupUid).detail("prefix", printable(StringRef(baLogRangePrefix)));
for (int64_t vblock = beginVersion / blockSize; vblock < (endVersion + blockSize - 1) / blockSize; ++vblock) {
int64_t tb = vblock * blockSize / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
uint64_t bv = bigEndian64(std::max(beginVersion, vblock * blockSize));
uint64_t ev = bigEndian64(std::min(endVersion, (vblock + 1) * blockSize));
uint32_t data = tb & 0xffffffff;
uint8_t hash = (uint8_t)hashlittle(&data, sizeof(uint32_t), 0);
Key vblockPrefix = StringRef(&hash, sizeof(uint8_t)).withPrefix(baLogRangePrefix);
ret.push_back_deep(ret.arena(), KeyRangeRef(StringRef((uint8_t*)&bv, sizeof(uint64_t)).withPrefix(vblockPrefix),
StringRef((uint8_t*)&ev, sizeof(uint64_t)).withPrefix(vblockPrefix)));
}
return ret;
}
Standalone<VectorRef<KeyRangeRef>> getApplyRanges(Version beginVersion, Version endVersion, Key backupUid) {
Standalone<VectorRef<KeyRangeRef>> ret;
Key baLogRangePrefix = backupUid.withPrefix(applyLogKeys.begin);
//TraceEvent("getLogRanges").detail("backupUid", backupUid).detail("prefix", printable(StringRef(baLogRangePrefix)));
for (int64_t vblock = beginVersion / CLIENT_KNOBS->APPLY_BLOCK_SIZE; vblock < (endVersion + CLIENT_KNOBS->APPLY_BLOCK_SIZE - 1) / CLIENT_KNOBS->APPLY_BLOCK_SIZE; ++vblock) {
int64_t tb = vblock * CLIENT_KNOBS->APPLY_BLOCK_SIZE / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
uint64_t bv = bigEndian64(std::max(beginVersion, vblock * CLIENT_KNOBS->APPLY_BLOCK_SIZE));
uint64_t ev = bigEndian64(std::min(endVersion, (vblock + 1) * CLIENT_KNOBS->APPLY_BLOCK_SIZE));
uint32_t data = tb & 0xffffffff;
uint8_t hash = (uint8_t)hashlittle(&data, sizeof(uint32_t), 0);
Key vblockPrefix = StringRef(&hash, sizeof(uint8_t)).withPrefix(baLogRangePrefix);
ret.push_back_deep(ret.arena(), KeyRangeRef(StringRef((uint8_t*)&bv, sizeof(uint64_t)).withPrefix(vblockPrefix),
StringRef((uint8_t*)&ev, sizeof(uint64_t)).withPrefix(vblockPrefix)));
}
return ret;
}
Key getApplyKey( Version version, Key backupUid ) {
int64_t vblock = (version-1) / CLIENT_KNOBS->LOG_RANGE_BLOCK_SIZE;
uint64_t v = bigEndian64(version);
uint32_t data = vblock & 0xffffffff;
uint8_t hash = (uint8_t)hashlittle(&data, sizeof(uint32_t), 0);
Key k1 = StringRef((uint8_t*)&v, sizeof(uint64_t)).withPrefix(StringRef(&hash, sizeof(uint8_t)));
Key k2 = k1.withPrefix(backupUid);
return k2.withPrefix(applyLogKeys.begin);
}
//Given a key from one of the ranges returned by get_log_ranges,
//returns(version, part) where version is the database version number of
//the transaction log data in the value, and part is 0 for the first such
//data for a given version, 1 for the second block of data, etc.
std::pair<uint64_t, uint32_t> decodeBKMutationLogKey(Key key) {
return std::make_pair(bigEndian64(*(int64_t*)(key.begin() + backupLogPrefixBytes + sizeof(UID) + sizeof(uint8_t))),
bigEndian32(*(int32_t*)(key.begin() + backupLogPrefixBytes + sizeof(UID) + sizeof(uint8_t) + sizeof(int64_t))));
}
// value is an iterable representing all of the transaction log data for
// a given version.Returns an iterable(generator) yielding a tuple for
// each mutation in the log.At present, all mutations are represented as
// (type, param1, param2) where type is an integer and param1 and param2 are byte strings
Standalone<VectorRef<MutationRef>> decodeBackupLogValue(StringRef value) {
try {
uint64_t offset(0);
uint64_t protocolVersion = 0;
memcpy(&protocolVersion, value.begin(), sizeof(uint64_t));
offset += sizeof(uint64_t);
if (protocolVersion <= 0x0FDB00A200090001){
TraceEvent(SevError, "decodeBackupLogValue").detail("incompatible_protocol_version", protocolVersion)
.detail("valueSize", value.size()).detail("value", printable(value));
throw incompatible_protocol_version();
}
Standalone<VectorRef<MutationRef>> result;
uint32_t totalBytes = 0;
memcpy(&totalBytes, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t consumed = 0;
if(totalBytes + offset > value.size())
throw restore_missing_data();
int originalOffset = offset;
while (consumed < totalBytes){
uint32_t type = 0;
memcpy(&type, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t len1 = 0;
memcpy(&len1, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t len2 = 0;
memcpy(&len2, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
MutationRef logValue;
logValue.type = type;
logValue.param1 = value.substr(offset, len1);
offset += len1;
logValue.param2 = value.substr(offset, len2);
offset += len2;
result.push_back_deep(result.arena(), logValue);
consumed += BackupAgentBase::logHeaderSize + len1 + len2;
}
ASSERT(consumed == totalBytes);
if (value.size() != offset) {
TraceEvent(SevError, "BA_decodeBackupLogValue").detail("unexpected_extra_data_size", value.size()).detail("offset", offset).detail("totalBytes", totalBytes).detail("consumed", consumed).detail("originalOffset", originalOffset);
throw restore_corrupted_data();
}
return result;
}
catch (Error& e) {
TraceEvent(e.code() == error_code_restore_missing_data ? SevWarn : SevError, "BA_decodeBackupLogValue").error(e).GetLastError().detail("valueSize", value.size()).detail("value", printable(value));
throw;
}
}
void decodeBackupLogValue(Arena& arena, VectorRef<MutationRef>& result, int& mutationSize, StringRef value, StringRef addPrefix, StringRef removePrefix, Version version, Reference<KeyRangeMap<Version>> key_version) {
try {
uint64_t offset(0);
uint64_t protocolVersion = 0;
memcpy(&protocolVersion, value.begin(), sizeof(uint64_t));
offset += sizeof(uint64_t);
if (protocolVersion <= 0x0FDB00A200090001){
TraceEvent(SevError, "decodeBackupLogValue").detail("incompatible_protocol_version", protocolVersion)
.detail("valueSize", value.size()).detail("value", printable(value));
throw incompatible_protocol_version();
}
uint32_t totalBytes = 0;
memcpy(&totalBytes, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t consumed = 0;
if(totalBytes + offset > value.size())
throw restore_missing_data();
int originalOffset = offset;
while (consumed < totalBytes){
uint32_t type = 0;
memcpy(&type, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t len1 = 0;
memcpy(&len1, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
uint32_t len2 = 0;
memcpy(&len2, value.begin() + offset, sizeof(uint32_t));
offset += sizeof(uint32_t);
ASSERT(offset+len1+len2<=value.size() && isValidMutationType(type));
MutationRef logValue;
Arena tempArena;
logValue.type = type;
logValue.param1 = value.substr(offset, len1);
offset += len1;
logValue.param2 = value.substr(offset, len2);
offset += len2;
if (logValue.type == MutationRef::ClearRange) {
KeyRangeRef range(logValue.param1, logValue.param2);
auto ranges = key_version->intersectingRanges(range);
for (auto r : ranges) {
if (version > r.value() && r.value() != invalidVersion) {
KeyRef minKey = std::min(r.range().end, range.end);
if (minKey == (removePrefix == StringRef() ? normalKeys.end : strinc(removePrefix))) {
logValue.param1 = std::max(r.range().begin, range.begin);
if(removePrefix.size()) {
logValue.param1 = logValue.param1.removePrefix(removePrefix);
}
if(addPrefix.size()) {
logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena);
}
logValue.param2 = addPrefix == StringRef() ? normalKeys.end : strinc(addPrefix, tempArena);
result.push_back_deep(arena, logValue);
mutationSize += logValue.expectedSize();
}
else {
logValue.param1 = std::max(r.range().begin, range.begin);
logValue.param2 = minKey;
if(removePrefix.size()) {
logValue.param1 = logValue.param1.removePrefix(removePrefix);
logValue.param2 = logValue.param2.removePrefix(removePrefix);
}
if(addPrefix.size()) {
logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena);
logValue.param2 = logValue.param2.withPrefix(addPrefix, tempArena);
}
result.push_back_deep(arena, logValue);
mutationSize += logValue.expectedSize();
}
}
}
}
else {
Version ver = key_version->rangeContaining(logValue.param1).value();
//TraceEvent("ApplyMutation").detail("logValue", logValue.toString()).detail("version", version).detail("ver", ver).detail("apply", version > ver && ver != invalidVersion);
if (version > ver && ver != invalidVersion) {
if(removePrefix.size()) {
logValue.param1 = logValue.param1.removePrefix(removePrefix);
}
if(addPrefix.size()) {
logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena);
}
result.push_back_deep(arena, logValue);
mutationSize += logValue.expectedSize();
}
}
consumed += BackupAgentBase::logHeaderSize + len1 + len2;
}
ASSERT(consumed == totalBytes);
if (value.size() != offset) {
TraceEvent(SevError, "BA_decodeBackupLogValue").detail("unexpected_extra_data_size", value.size()).detail("offset", offset).detail("totalBytes", totalBytes).detail("consumed", consumed).detail("originalOffset", originalOffset);
throw restore_corrupted_data();
}
}
catch (Error& e) {
TraceEvent(e.code() == error_code_restore_missing_data ? SevWarn : SevError, "BA_decodeBackupLogValue").error(e).GetLastError().detail("valueSize", value.size()).detail("value", printable(value));
throw;
}
}
static double lastErrorTime = 0;
ACTOR Future<Void> logErrorWorker(Reference<ReadYourWritesTransaction> tr, Key keyErrors, std::string message) {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
if(now() - lastErrorTime > CLIENT_KNOBS->BACKUP_ERROR_DELAY) {
TraceEvent("BA_logError").detail("key", printable(keyErrors)).detail("message", message);
lastErrorTime = now();
}
tr->set(keyErrors, message);
return Void();
}
Future<Void> logError(Database cx, Key keyErrors, const std::string& message) {
return runRYWTransaction(cx, [=](Reference<ReadYourWritesTransaction> tr){return logErrorWorker(tr, keyErrors, message); });
}
Future<Void> logError(Reference<ReadYourWritesTransaction> tr, Key keyErrors, const std::string& message) {
return logError(tr->getDatabase(), keyErrors, message);
}
ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RangeResultWithVersion> results, Reference<FlowLock> lock,
KeyRangeRef range, bool terminator, bool systemAccess, bool lockAware) {
state KeySelector begin = firstGreaterOrEqual(range.begin);
state KeySelector end = firstGreaterOrEqual(range.end);
state GetRangeLimits limits(CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, (g_network->isSimulated() && !g_simulator.speedUpSimulation) ? CLIENT_KNOBS->BACKUP_SIMULATED_LIMIT_BYTES : CLIENT_KNOBS->BACKUP_GET_RANGE_LIMIT_BYTES);
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state FlowLock::Releaser releaser;
loop{
try {
if (systemAccess)
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (lockAware)
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
//add lock
releaser.release();
Void _ = wait(lock->take(TaskDefaultYield, limits.bytes + CLIENT_KNOBS->VALUE_SIZE_LIMIT + CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT));
releaser = FlowLock::Releaser(*lock, limits.bytes + CLIENT_KNOBS->VALUE_SIZE_LIMIT + CLIENT_KNOBS->SYSTEM_KEY_SIZE_LIMIT);
state Standalone<RangeResultRef> values = wait(tr->getRange(begin, end, limits));
// When this buggify line is enabled, if there are more than 1 result then use half of the results
if(values.size() > 1 && BUGGIFY) {
values.resize(values.arena(), values.size() / 2);
values.more = true;
// Half of the time wait for this tr to expire so that the next read is at a different version
if(g_random->random01() < 0.5)
Void _ = wait(delay(6.0));
}
releaser.remaining -= values.expectedSize(); //its the responsibility of the caller to release after this point
ASSERT(releaser.remaining >= 0);
results.send(RangeResultWithVersion(values, tr->getReadVersion().get()));
if (values.size() > 0)
begin = firstGreaterThan(values.end()[-1].key);
if (!values.more && !limits.isReached()) {
if(terminator)
results.sendError(end_of_stream());
2017-05-26 04:48:44 +08:00
return Void();
}
}
catch (Error &e) {
if (e.code() != error_code_transaction_too_old && e.code() != error_code_future_version)
2017-05-26 04:48:44 +08:00
throw;
tr = Reference<ReadYourWritesTransaction>(new ReadYourWritesTransaction(cx));
}
}
}
ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Future<Void> active, Reference<FlowLock> lock,
KeyRangeRef range, std::function< std::pair<uint64_t, uint32_t>(Key key) > groupBy,
bool terminator, bool systemAccess, bool lockAware, std::function< Future<Void>(Reference<ReadYourWritesTransaction> tr) > withEachFunction)
{
state KeySelector nextKey = firstGreaterOrEqual(range.begin);
state KeySelector end = firstGreaterOrEqual(range.end);
state GetRangeLimits limits(CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, (g_network->isSimulated() && !g_simulator.speedUpSimulation) ? CLIENT_KNOBS->BACKUP_SIMULATED_LIMIT_BYTES : CLIENT_KNOBS->BACKUP_GET_RANGE_LIMIT_BYTES);
state RCGroup rcGroup = RCGroup();
state uint64_t skipGroup(ULLONG_MAX);
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state FlowLock::Releaser releaser;
loop{
try {
if (systemAccess)
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
if (lockAware)
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Future<Void> withEach = Void();
if (withEachFunction) withEach = withEachFunction(tr);
state Standalone<RangeResultRef> rangevalue = wait(tr->getRange(nextKey, end, limits));
// When this buggify line is enabled, if there are more than 1 result then use half of the results
if(rangevalue.size() > 1 && BUGGIFY) {
rangevalue.resize(rangevalue.arena(), rangevalue.size() / 2);
rangevalue.more = true;
// Half of the time wait for this tr to expire so that the next read is at a different version
if(g_random->random01() < 0.5)
Void _ = wait(delay(6.0));
}
Void _ = wait(withEach);
//add lock
Void _ = wait(active);
releaser.release();
Void _ = wait(lock->take(TaskDefaultYield, rangevalue.expectedSize() + rcGroup.items.expectedSize()));
releaser = FlowLock::Releaser(*lock, rangevalue.expectedSize() + rcGroup.items.expectedSize());
int index(0);
for (auto & s : rangevalue){
uint64_t groupKey = groupBy(s.key).first;
//TraceEvent("log_readCommitted").detail("groupKey", groupKey).detail("skipGroup", skipGroup).detail("nextKey", printable(nextKey.key)).detail("end", printable(end.key)).detail("valuesize", value.size()).detail("index",index++).detail("size",s.value.size());
if (groupKey != skipGroup){
if (rcGroup.version == -1){
rcGroup.version = tr->getReadVersion().get();
rcGroup.groupKey = groupKey;
}
else if (rcGroup.groupKey != groupKey) {
//TraceEvent("log_readCommitted").detail("sendGroup0", rcGroup.groupKey).detail("itemSize", rcGroup.items.size()).detail("data_length",rcGroup.items[0].value.size());
//state uint32_t len(0);
//for (size_t j = 0; j < rcGroup.items.size(); ++j) {
// len += rcGroup.items[j].value.size();
//}
//TraceEvent("SendGroup").detail("groupKey", rcGroup.groupKey).detail("version", rcGroup.version).detail("length", len).detail("releaser.remaining", releaser.remaining);
releaser.remaining -= rcGroup.items.expectedSize(); //its the responsibility of the caller to release after this point
ASSERT(releaser.remaining >= 0);
results.send(rcGroup);
nextKey = firstGreaterThan(rcGroup.items.end()[-1].key);
skipGroup = rcGroup.groupKey;
rcGroup = RCGroup();
rcGroup.version = tr->getReadVersion().get();
rcGroup.groupKey = groupKey;
}
rcGroup.items.push_back_deep(rcGroup.items.arena(), s);
}
}
if (!rangevalue.more) {
if (rcGroup.version != -1){
releaser.remaining -= rcGroup.items.expectedSize(); //its the responsibility of the caller to release after this point
ASSERT(releaser.remaining >= 0);
//TraceEvent("log_readCommitted").detail("sendGroup1", rcGroup.groupKey).detail("itemSize", rcGroup.items.size()).detail("data_length", rcGroup.items[0].value.size());
results.send(rcGroup);
}
if(terminator)
results.sendError(end_of_stream());
2017-05-26 04:48:44 +08:00
return Void();
}
nextKey = firstGreaterThan(rangevalue.end()[-1].key);
}
catch (Error &e) {
if (e.code() != error_code_transaction_too_old && e.code() != error_code_future_version)
2017-05-26 04:48:44 +08:00
throw;
Void _ = wait(tr->onError(e));
}
}
}
ACTOR Future<Void> readCommitted(Database cx, PromiseStream<RCGroup> results, Reference<FlowLock> lock,
KeyRangeRef range, std::function< std::pair<uint64_t, uint32_t>(Key key) > groupBy)
{
state KeySelector nextKey = firstGreaterOrEqual(range.begin);
state KeySelector end = firstGreaterOrEqual(range.end);
state GetRangeLimits limits(CLIENT_KNOBS->ROW_LIMIT_UNLIMITED, (g_network->isSimulated() && !g_simulator.speedUpSimulation) ? CLIENT_KNOBS->BACKUP_SIMULATED_LIMIT_BYTES : CLIENT_KNOBS->BACKUP_GET_RANGE_LIMIT_BYTES);
state RCGroup rcGroup = RCGroup();
state uint64_t skipGroup(ULLONG_MAX);
state Reference<ReadYourWritesTransaction> tr(new ReadYourWritesTransaction(cx));
state FlowLock::Releaser releaser;
loop{
try {
tr->setOption(FDBTransactionOptions::ACCESS_SYSTEM_KEYS);
tr->setOption(FDBTransactionOptions::LOCK_AWARE);
state Standalone<RangeResultRef> rangevalue = wait(tr->getRange(nextKey, end, limits));
// When this buggify line is enabled, if there are more than 1 result then use half of the results
if(rangevalue.size() > 1 && BUGGIFY) {
rangevalue.resize(rangevalue.arena(), rangevalue.size() / 2);
rangevalue.more = true;
// Half of the time wait for this tr to expire so that the next read is at a different version
if(g_random->random01() < 0.5)
Void _ = wait(delay(6.0));
}
releaser.release();
Void _ = wait(lock->take(TaskDefaultYield, rangevalue.expectedSize() + rcGroup.items.expectedSize()));
releaser = FlowLock::Releaser(*lock, rangevalue.expectedSize() + rcGroup.items.expectedSize());
int index(0);
for (auto & s : rangevalue){
uint64_t groupKey = groupBy(s.key).first;
//TraceEvent("log_readCommitted").detail("groupKey", groupKey).detail("skipGroup", skipGroup).detail("nextKey", printable(nextKey.key)).detail("end", printable(end.key)).detail("valuesize", value.size()).detail("index",index++).detail("size",s.value.size());
if (groupKey != skipGroup){
if (rcGroup.version == -1){
rcGroup.version = tr->getReadVersion().get();
rcGroup.groupKey = groupKey;
}
else if (rcGroup.groupKey != groupKey) {
//TraceEvent("log_readCommitted").detail("sendGroup0", rcGroup.groupKey).detail("itemSize", rcGroup.items.size()).detail("data_length",rcGroup.items[0].value.size());
state uint32_t len(0);
for (size_t j = 0; j < rcGroup.items.size(); ++j) {
len += rcGroup.items[j].value.size();
}
//TraceEvent("SendGroup").detail("groupKey", rcGroup.groupKey).detail("version", rcGroup.version).detail("length", len).detail("releaser.remaining", releaser.remaining);
releaser.remaining -= rcGroup.items.expectedSize(); //its the responsibility of the caller to release after this point
ASSERT(releaser.remaining >= 0);
results.send(rcGroup);
nextKey = firstGreaterThan(rcGroup.items.end()[-1].key);
skipGroup = rcGroup.groupKey;
rcGroup = RCGroup();
rcGroup.version = tr->getReadVersion().get();
rcGroup.groupKey = groupKey;
}
rcGroup.items.push_back_deep(rcGroup.items.arena(), s);
}
}
if (!rangevalue.more) {
if (rcGroup.version != -1){
releaser.remaining -= rcGroup.items.expectedSize(); //its the responsibility of the caller to release after this point
ASSERT(releaser.remaining >= 0);
//TraceEvent("log_readCommitted").detail("sendGroup1", rcGroup.groupKey).detail("itemSize", rcGroup.items.size()).detail("data_length", rcGroup.items[0].value.size());
results.send(rcGroup);
}
results.sendError(end_of_stream());
return Void();
}
nextKey = firstGreaterThan(rangevalue.end()[-1].key);
}
catch (Error &e) {
if (e.code() != error_code_transaction_too_old && e.code() != error_code_future_version)
2017-05-26 04:48:44 +08:00
throw;
Void _ = wait(tr->onError(e));
}
}
}
ACTOR Future<int> dumpData(Database cx, PromiseStream<RCGroup> results, Reference<FlowLock> lock, Key uid, Key addPrefix, Key removePrefix, RequestStream<CommitTransactionRequest> commit,
2017-05-26 04:48:44 +08:00
NotifiedVersion* committedVersion, Optional<Version> endVersion, Key rangeBegin, PromiseStream<Future<Void>> addActor, FlowLock* commitLock, Reference<KeyRangeMap<Version>> keyVersion ) {
state Version lastVersion = invalidVersion;
state bool endOfStream = false;
state int totalBytes = 0;
loop {
state CommitTransactionRequest req;
state Version newBeginVersion = invalidVersion;
state int mutationSize = 0;
loop {
try {
RCGroup group = waitNext(results.getFuture());
lock->release(group.items.expectedSize());
BinaryWriter bw(Unversioned());
for(int i = 0; i < group.items.size(); ++i) {
bw.serializeBytes(group.items[i].value);
}
decodeBackupLogValue(req.arena, req.transaction.mutations, mutationSize, bw.toStringRef(), addPrefix, removePrefix, group.groupKey, keyVersion);
newBeginVersion = group.groupKey + 1;
if(mutationSize >= CLIENT_KNOBS->BACKUP_LOG_WRITE_BATCH_MAX_SIZE) {
break;
}
}
catch (Error &e) {
if (e.code() == error_code_end_of_stream) {
if(endVersion.present() && endVersion.get() > lastVersion && endVersion.get() > newBeginVersion) {
newBeginVersion = endVersion.get();
}
if(newBeginVersion == invalidVersion)
return totalBytes;
endOfStream = true;
break;
}
throw;
}
}
Key applyBegin = uid.withPrefix(applyMutationsBeginRange.begin);
Key versionKey = BinaryWriter::toValue(newBeginVersion, Unversioned());
Key rangeEnd = getApplyKey(newBeginVersion, uid);
2017-05-26 04:48:44 +08:00
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::SetValue, applyBegin, versionKey));
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(applyBegin));
2017-05-26 04:48:44 +08:00
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::ClearRange, rangeBegin, rangeEnd));
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(rangeBegin));
2017-05-26 04:48:44 +08:00
// The commit request contains no read conflict ranges, so regardless of what read version we
// choose, it's impossible for us to get a transaction_too_old error back, and it's impossible
// for our transaction to be aborted due to conflicts.
2017-05-26 04:48:44 +08:00
req.transaction.read_snapshot = committedVersion->get();
req.isLockAware = true;
totalBytes += mutationSize;
Void _ = wait( commitLock->take(TaskDefaultYield, mutationSize) );
addActor.send( commitLock->releaseWhen( success(commit.getReply(req)), mutationSize ) );
if(endOfStream) {
return totalBytes;
}
}
}
ACTOR Future<Void> coalesceKeyVersionCache(Key uid, Version endVersion, Reference<KeyRangeMap<Version>> keyVersion, RequestStream<CommitTransactionRequest> commit, NotifiedVersion* committedVersion, PromiseStream<Future<Void>> addActor, FlowLock* commitLock) {
Version lastVersion = -1000;
int64_t removed = 0;
state CommitTransactionRequest req;
state int64_t mutationSize = 0;
Key mapPrefix = uid.withPrefix(applyMutationsKeyVersionMapRange.begin);
for(auto it : keyVersion->ranges()) {
if( lastVersion == -1000 ) {
lastVersion = it.value();
} else {
Version ver = it.value();
if(ver < endVersion && lastVersion < endVersion && ver != invalidVersion && lastVersion != invalidVersion) {
Key removeKey = it.range().begin.withPrefix(mapPrefix);
Key removeEnd = keyAfter(removeKey);
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::ClearRange, removeKey, removeEnd));
mutationSize += removeKey.size() + removeEnd.size();
removed--;
} else {
lastVersion = ver;
}
}
}
if(removed != 0) {
Key countKey = uid.withPrefix(applyMutationsKeyVersionCountRange.begin);
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(countKey));
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::AddValue, countKey, StringRef((uint8_t*)&removed, 8)));
req.transaction.read_snapshot = committedVersion->get();
req.isLockAware = true;
Void _ = wait( commitLock->take(TaskDefaultYield, mutationSize) );
addActor.send( commitLock->releaseWhen( success(commit.getReply(req)), mutationSize ) );
}
return Void();
}
ACTOR Future<Void> applyMutations(Database cx, Key uid, Key addPrefix, Key removePrefix, Version beginVersion, Version* endVersion, RequestStream<CommitTransactionRequest> commit, NotifiedVersion* committedVersion, Reference<KeyRangeMap<Version>> keyVersion ) {
state FlowLock commitLock(CLIENT_KNOBS->BACKUP_LOCK_BYTES);
state PromiseStream<Future<Void>> addActor;
state Future<Void> error = actorCollection( addActor.getFuture() );
state int maxBytes = CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES;
try {
loop {
if(beginVersion >= *endVersion) {
Void _ = wait( commitLock.take(TaskDefaultYield, CLIENT_KNOBS->BACKUP_LOCK_BYTES) );
commitLock.release(CLIENT_KNOBS->BACKUP_LOCK_BYTES);
if(beginVersion >= *endVersion) {
return Void();
}
}
int rangeCount = std::max(1, CLIENT_KNOBS->APPLY_MAX_LOCK_BYTES / maxBytes);
state Version newEndVersion = std::min(*endVersion, ((beginVersion / CLIENT_KNOBS->APPLY_BLOCK_SIZE) + rangeCount) * CLIENT_KNOBS->APPLY_BLOCK_SIZE);
state Standalone<VectorRef<KeyRangeRef>> ranges = getApplyRanges(beginVersion, newEndVersion, uid);
state size_t idx;
state std::vector<PromiseStream<RCGroup>> results;
state std::vector<Future<Void>> rc;
state std::vector<Reference<FlowLock>> locks;
for (int i = 0; i < ranges.size(); ++i) {
results.push_back(PromiseStream<RCGroup>());
locks.push_back(Reference<FlowLock>( new FlowLock(std::max(CLIENT_KNOBS->APPLY_MAX_LOCK_BYTES/ranges.size(), CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES))));
rc.push_back(readCommitted(cx, results[i], locks[i], ranges[i], decodeBKMutationLogKey));
}
maxBytes = std::max<int>(maxBytes*CLIENT_KNOBS->APPLY_MAX_DECAY_RATE, CLIENT_KNOBS->APPLY_MIN_LOCK_BYTES);
for (idx = 0; idx < ranges.size(); ++idx) {
int bytes = wait(dumpData(cx, results[idx], locks[idx], uid, addPrefix, removePrefix, commit, committedVersion, idx==ranges.size()-1 ? newEndVersion : Optional<Version>(), ranges[idx].begin, addActor, &commitLock, keyVersion));
maxBytes = std::max<int>(CLIENT_KNOBS->APPLY_MAX_INCREASE_FACTOR*bytes, maxBytes);
if(error.isError()) throw error.getError();
}
Void _ = wait(coalesceKeyVersionCache(uid, newEndVersion, keyVersion, commit, committedVersion, addActor, &commitLock));
beginVersion = newEndVersion;
}
} catch( Error &e ) {
TraceEvent(e.code() == error_code_restore_missing_data ? SevWarnAlways : SevError, "AM_error").error(e);
throw;
}
}