Backup Mutation Log Separates Tenant Map Modifications During Restore (#9292)
mutation log separates tenant map modifications
This commit is contained in:
parent
2fe32610d6
commit
bf85c9f8af
|
@ -23,6 +23,7 @@
|
|||
|
||||
#include "fdbclient/BackupAgent.actor.h"
|
||||
#include "fdbclient/BlobCipher.h"
|
||||
#include "fdbclient/CommitProxyInterface.h"
|
||||
#include "fdbclient/CommitTransaction.h"
|
||||
#include "fdbclient/FDBTypes.h"
|
||||
#include "fdbclient/GetEncryptCipherKeys.actor.h"
|
||||
|
@ -290,10 +291,28 @@ std::pair<Version, uint32_t> decodeBKMutationLogKey(Key key) {
|
|||
bigEndian32(*(int32_t*)(key.begin() + backupLogPrefixBytes + sizeof(UID) + sizeof(uint8_t) + sizeof(int64_t))));
|
||||
}
|
||||
|
||||
void _addResult(bool* tenantMapChanging,
|
||||
VectorRef<MutationRef>* result,
|
||||
int* mutationSize,
|
||||
Arena* arena,
|
||||
MutationRef logValue,
|
||||
KeyRangeRef tenantMapRange) {
|
||||
*tenantMapChanging = *tenantMapChanging || TenantAPI::tenantMapChanging(logValue, tenantMapRange);
|
||||
result->push_back_deep(*arena, logValue);
|
||||
*mutationSize += logValue.expectedSize();
|
||||
}
|
||||
|
||||
/*
|
||||
This actor is responsible for taking an original transaction which was added to the backup mutation log (represented
|
||||
by "value" parameter), breaking it up into the individual MutationRefs (that constitute the transaction), decrypting
|
||||
each mutation (if needed) and adding/removing prefixes from the mutations. The final mutations are then added to the
|
||||
"result" vector alongside their encrypted counterparts (which is added to the "encryptedResult" vector)
|
||||
*/
|
||||
ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
|
||||
VectorRef<MutationRef>* result,
|
||||
VectorRef<Optional<MutationRef>>* encryptedResult,
|
||||
int* mutationSize,
|
||||
bool* tenantMapChanging,
|
||||
Standalone<StringRef> value,
|
||||
Key addPrefix,
|
||||
Key removePrefix,
|
||||
|
@ -325,6 +344,7 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
|
|||
|
||||
state int originalOffset = offset;
|
||||
state DatabaseConfiguration config = wait(getDatabaseConfiguration(cx));
|
||||
state KeyRangeRef tenantMapRange = TenantMetadata::tenantMap().subspace;
|
||||
|
||||
while (consumed < totalBytes) {
|
||||
uint32_t type = 0;
|
||||
|
@ -410,8 +430,7 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
|
|||
logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena);
|
||||
}
|
||||
logValue.param2 = addPrefix == StringRef() ? allKeys.end : strinc(addPrefix, tempArena);
|
||||
result->push_back_deep(*arena, logValue);
|
||||
*mutationSize += logValue.expectedSize();
|
||||
_addResult(tenantMapChanging, result, mutationSize, arena, logValue, tenantMapRange);
|
||||
} else {
|
||||
logValue.param1 = std::max(r.range().begin, range.begin);
|
||||
logValue.param2 = minKey;
|
||||
|
@ -423,8 +442,7 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
|
|||
logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena);
|
||||
logValue.param2 = logValue.param2.withPrefix(addPrefix, tempArena);
|
||||
}
|
||||
result->push_back_deep(*arena, logValue);
|
||||
*mutationSize += logValue.expectedSize();
|
||||
_addResult(tenantMapChanging, result, mutationSize, arena, logValue, tenantMapRange);
|
||||
}
|
||||
if (originalLogValue.param1 == logValue.param1 && originalLogValue.param2 == logValue.param2) {
|
||||
encryptedResult->push_back_deep(*arena, encryptedLogValue);
|
||||
|
@ -443,8 +461,7 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
|
|||
if (addPrefix.size()) {
|
||||
logValue.param1 = logValue.param1.withPrefix(addPrefix, tempArena);
|
||||
}
|
||||
result->push_back_deep(*arena, logValue);
|
||||
*mutationSize += logValue.expectedSize();
|
||||
_addResult(tenantMapChanging, result, mutationSize, arena, logValue, tenantMapRange);
|
||||
// If we did not remove/add prefixes to the mutation then keep the original encrypted mutation so we
|
||||
// do not have to re-encrypt unnecessarily
|
||||
if (originalLogValue.param1 == logValue.param1 && originalLogValue.param2 == logValue.param2) {
|
||||
|
@ -695,6 +712,41 @@ Future<Void> readCommitted(Database cx,
|
|||
cx, results, Void(), lock, range, groupBy, Terminator::True, AccessSystemKeys::True, LockAware::True);
|
||||
}
|
||||
|
||||
ACTOR Future<Void> sendCommitTransactionRequest(CommitTransactionRequest req,
|
||||
Key uid,
|
||||
Version newBeginVersion,
|
||||
Key rangeBegin,
|
||||
NotifiedVersion* committedVersion,
|
||||
int* totalBytes,
|
||||
int* mutationSize,
|
||||
PromiseStream<Future<Void>> addActor,
|
||||
FlowLock* commitLock,
|
||||
PublicRequestStream<CommitTransactionRequest> commit) {
|
||||
Key applyBegin = uid.withPrefix(applyMutationsBeginRange.begin);
|
||||
Key versionKey = BinaryWriter::toValue(newBeginVersion, Unversioned());
|
||||
Key rangeEnd = getApplyKey(newBeginVersion, uid);
|
||||
|
||||
// mutations and encrypted mutations (and their relationship) is described in greater detail in the defenition of
|
||||
// CommitTransactionRef in CommitTransaction.h
|
||||
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::SetValue, applyBegin, versionKey));
|
||||
req.transaction.encryptedMutations.push_back_deep(req.arena, Optional<MutationRef>());
|
||||
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(applyBegin));
|
||||
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::ClearRange, rangeBegin, rangeEnd));
|
||||
req.transaction.encryptedMutations.push_back_deep(req.arena, Optional<MutationRef>());
|
||||
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(rangeBegin));
|
||||
|
||||
// The commit request contains no read conflict ranges, so regardless of what read version we
|
||||
// choose, it's impossible for us to get a transaction_too_old error back, and it's impossible
|
||||
// for our transaction to be aborted due to conflicts.
|
||||
req.transaction.read_snapshot = committedVersion->get();
|
||||
req.flags = req.flags | CommitTransactionRequest::FLAG_IS_LOCK_AWARE;
|
||||
|
||||
*totalBytes += *mutationSize;
|
||||
wait(commitLock->take(TaskPriority::DefaultYield, *mutationSize));
|
||||
addActor.send(commitLock->releaseWhen(success(commit.getReply(req)), *mutationSize));
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<int> kvMutationLogToTransactions(Database cx,
|
||||
PromiseStream<RCGroup> results,
|
||||
Reference<FlowLock> lock,
|
||||
|
@ -717,20 +769,26 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
|
|||
state CommitTransactionRequest req;
|
||||
state Version newBeginVersion = invalidVersion;
|
||||
state int mutationSize = 0;
|
||||
state bool tenantMapChanging = false;
|
||||
loop {
|
||||
try {
|
||||
state RCGroup group = waitNext(results.getFuture());
|
||||
state CommitTransactionRequest curReq;
|
||||
lock->release(group.items.expectedSize());
|
||||
state int curBatchMutationSize = 0;
|
||||
tenantMapChanging = false;
|
||||
|
||||
BinaryWriter bw(Unversioned());
|
||||
for (int i = 0; i < group.items.size(); ++i) {
|
||||
bw.serializeBytes(group.items[i].value);
|
||||
}
|
||||
// Parse a single transaction from the backup mutation log
|
||||
Standalone<StringRef> value = bw.toValue();
|
||||
wait(decodeBackupLogValue(&req.arena,
|
||||
&req.transaction.mutations,
|
||||
&req.transaction.encryptedMutations,
|
||||
&mutationSize,
|
||||
wait(decodeBackupLogValue(&curReq.arena,
|
||||
&curReq.transaction.mutations,
|
||||
&curReq.transaction.encryptedMutations,
|
||||
&curBatchMutationSize,
|
||||
&tenantMapChanging,
|
||||
value,
|
||||
addPrefix,
|
||||
removePrefix,
|
||||
|
@ -739,8 +797,48 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
|
|||
cx,
|
||||
tenantMap,
|
||||
provisionalProxy));
|
||||
|
||||
// A single call to decodeBackupLogValue (above) will only parse mutations from a single transaction,
|
||||
// however in the code below we batch the results across several calls to decodeBackupLogValue and send
|
||||
// it in one big CommitTransactionRequest (so one CTR contains mutations from multiple transactions).
|
||||
// Generally, this would be fine since the mutations in the log are ordered (and thus so are the results
|
||||
// after calling decodeBackupLogValue). However in the CommitProxy we do not allow mutations which
|
||||
// change the tenant map to appear alongside regular normalKey mutations in a single
|
||||
// CommitTransactionRequest. Thus the code below will immediately send any mutations accumulated thus
|
||||
// far if the latest call to decodeBackupLogValue contained a transaction which changed the tenant map
|
||||
// (before processing the mutations which caused the tenant map to change).
|
||||
if (tenantMapChanging && req.transaction.mutations.size()) {
|
||||
// If the tenantMap is changing send the previous CommitTransactionRequest to the CommitProxy
|
||||
TraceEvent("MutationLogRestoreTenantMapChanging").detail("BeginVersion", newBeginVersion);
|
||||
CODE_PROBE(true, "mutation log tenant map changing");
|
||||
wait(sendCommitTransactionRequest(req,
|
||||
uid,
|
||||
newBeginVersion,
|
||||
rangeBegin,
|
||||
committedVersion,
|
||||
&totalBytes,
|
||||
&mutationSize,
|
||||
addActor,
|
||||
commitLock,
|
||||
commit));
|
||||
req = CommitTransactionRequest();
|
||||
mutationSize = 0;
|
||||
}
|
||||
|
||||
state int i;
|
||||
for (i = 0; i < curReq.transaction.mutations.size(); i++) {
|
||||
req.transaction.mutations.push_back_deep(req.arena, curReq.transaction.mutations[i]);
|
||||
req.transaction.encryptedMutations.push_back_deep(req.arena,
|
||||
curReq.transaction.encryptedMutations[i]);
|
||||
}
|
||||
mutationSize += curBatchMutationSize;
|
||||
newBeginVersion = group.groupKey + 1;
|
||||
if (mutationSize >= CLIENT_KNOBS->BACKUP_LOG_WRITE_BATCH_MAX_SIZE) {
|
||||
|
||||
// At this point if the tenant map changed we would have already sent any normalKey mutations
|
||||
// accumulated thus far, so all thats left to do is to send all the mutations in the the offending
|
||||
// transaction that changed the tenant map. This is necessary so that we don't batch these tenant map
|
||||
// mutations with future normalKey mutations (which will result in the same problem discussed above).
|
||||
if (tenantMapChanging || mutationSize >= CLIENT_KNOBS->BACKUP_LOG_WRITE_BATCH_MAX_SIZE) {
|
||||
break;
|
||||
}
|
||||
} catch (Error& e) {
|
||||
|
@ -756,28 +854,16 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
|
|||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
Key applyBegin = uid.withPrefix(applyMutationsBeginRange.begin);
|
||||
Key versionKey = BinaryWriter::toValue(newBeginVersion, Unversioned());
|
||||
Key rangeEnd = getApplyKey(newBeginVersion, uid);
|
||||
|
||||
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::SetValue, applyBegin, versionKey));
|
||||
req.transaction.encryptedMutations.push_back_deep(req.arena, Optional<MutationRef>());
|
||||
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(applyBegin));
|
||||
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::ClearRange, rangeBegin, rangeEnd));
|
||||
req.transaction.encryptedMutations.push_back_deep(req.arena, Optional<MutationRef>());
|
||||
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(rangeBegin));
|
||||
|
||||
// The commit request contains no read conflict ranges, so regardless of what read version we
|
||||
// choose, it's impossible for us to get a transaction_too_old error back, and it's impossible
|
||||
// for our transaction to be aborted due to conflicts.
|
||||
req.transaction.read_snapshot = committedVersion->get();
|
||||
req.flags = req.flags | CommitTransactionRequest::FLAG_IS_LOCK_AWARE;
|
||||
|
||||
totalBytes += mutationSize;
|
||||
wait(commitLock->take(TaskPriority::DefaultYield, mutationSize));
|
||||
addActor.send(commitLock->releaseWhen(success(commit.getReply(req)), mutationSize));
|
||||
|
||||
wait(sendCommitTransactionRequest(req,
|
||||
uid,
|
||||
newBeginVersion,
|
||||
rangeBegin,
|
||||
committedVersion,
|
||||
&totalBytes,
|
||||
&mutationSize,
|
||||
addActor,
|
||||
commitLock,
|
||||
commit));
|
||||
if (endOfStream) {
|
||||
return totalBytes;
|
||||
}
|
||||
|
|
|
@ -67,6 +67,16 @@ int64_t extractTenantIdFromKeyRef(StringRef s) {
|
|||
return TenantAPI::prefixToId(prefix, EnforceValidTenantId::False);
|
||||
}
|
||||
|
||||
bool tenantMapChanging(MutationRef const& mutation, KeyRangeRef const& tenantMapRange) {
|
||||
if (isSingleKeyMutation((MutationRef::Type)mutation.type) && mutation.param1.startsWith(tenantMapRange.begin)) {
|
||||
return true;
|
||||
} else if (mutation.type == MutationRef::ClearRange &&
|
||||
tenantMapRange.intersects(KeyRangeRef(mutation.param1, mutation.param2))) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// validates whether the lastTenantId and the nextTenantId share the same 2 byte prefix
|
||||
bool nextTenantIdPrefixMatches(int64_t lastTenantId, int64_t nextTenantId) {
|
||||
if (getTenantIdPrefix(nextTenantId) != getTenantIdPrefix(lastTenantId)) {
|
||||
|
|
|
@ -126,6 +126,7 @@ Future<Void> checkTenantMode(Transaction tr, ClusterType expectedClusterType) {
|
|||
TenantMode tenantModeForClusterType(ClusterType clusterType, TenantMode tenantMode);
|
||||
int64_t extractTenantIdFromMutation(MutationRef m);
|
||||
int64_t extractTenantIdFromKeyRef(StringRef s);
|
||||
bool tenantMapChanging(MutationRef const& mutation, KeyRangeRef const& tenantMapRange);
|
||||
bool nextTenantIdPrefixMatches(int64_t lastTenantId, int64_t nextTenantId);
|
||||
int64_t getMaxAllowableTenantId(int64_t curTenantId);
|
||||
int64_t getTenantIdPrefix(int64_t tenantId);
|
||||
|
|
|
@ -1070,17 +1070,6 @@ bool validTenantAccess(MutationRef m, std::map<int64_t, TenantName> const& tenan
|
|||
return true;
|
||||
}
|
||||
|
||||
inline bool tenantMapChanging(MutationRef const& mutation) {
|
||||
const KeyRangeRef tenantMapRange = TenantMetadata::tenantMap().subspace;
|
||||
if (isSingleKeyMutation((MutationRef::Type)mutation.type) && mutation.param1.startsWith(tenantMapRange.begin)) {
|
||||
return true;
|
||||
} else if (mutation.type == MutationRef::ClearRange &&
|
||||
tenantMapRange.intersects(KeyRangeRef(mutation.param1, mutation.param2))) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// return an iterator to the first tenantId whose idToPrefix(id) >= prefix[0..8] in lexicographic order. If no such id,
|
||||
// return tenantMap.end()
|
||||
inline auto lowerBoundTenantId(const StringRef& prefix, const std::map<int64_t, TenantName>& tenantMap) {
|
||||
|
@ -1380,11 +1369,12 @@ Error validateAndProcessTenantAccess(Arena& arena,
|
|||
|
||||
std::vector<std::pair<int, std::vector<MutationRef>>> idxSplitMutations;
|
||||
int newMutationSize = mutations.size();
|
||||
KeyRangeRef tenantMapRange = TenantMetadata::tenantMap().subspace;
|
||||
for (int i = 0; i < mutations.size(); ++i) {
|
||||
auto& mutation = mutations[i];
|
||||
Optional<int64_t> tenantId;
|
||||
bool validAccess = true;
|
||||
changeTenant = changeTenant || tenantMapChanging(mutation);
|
||||
changeTenant = changeTenant || TenantAPI::tenantMapChanging(mutation, tenantMapRange);
|
||||
|
||||
if (mutation.type == MutationRef::ClearRange) {
|
||||
int newClearSize = processClearRangeMutation(
|
||||
|
@ -1471,6 +1461,7 @@ Error validateAndProcessTenantAccess(CommitTransactionRequest& tr,
|
|||
void applyMetadataEffect(CommitBatchContext* self) {
|
||||
bool initialState = self->isMyFirstBatch;
|
||||
self->firstStateMutations = self->isMyFirstBatch;
|
||||
KeyRangeRef tenantMapRange = TenantMetadata::tenantMap().subspace;
|
||||
for (int versionIndex = 0; versionIndex < self->resolution[0].stateMutations.size(); versionIndex++) {
|
||||
// pProxyCommitData->logAdapter->setNextVersion( ??? ); << Ideally we would be telling the log adapter that the
|
||||
// pushes in this commit will be in the version at which these state mutations were committed by another proxy,
|
||||
|
@ -1492,7 +1483,9 @@ void applyMetadataEffect(CommitBatchContext* self) {
|
|||
// fail transaction if it contain both of tenant changes and normal key writing
|
||||
auto& mutations = self->resolution[0].stateMutations[versionIndex][transactionIndex].mutations;
|
||||
committed =
|
||||
tenantIds.get().empty() || std::none_of(mutations.begin(), mutations.end(), tenantMapChanging);
|
||||
tenantIds.get().empty() || std::none_of(mutations.begin(), mutations.end(), [&](MutationRef m) {
|
||||
return TenantAPI::tenantMapChanging(m, tenantMapRange);
|
||||
});
|
||||
|
||||
// check if all tenant ids are valid if committed == true
|
||||
committed = committed &&
|
||||
|
|
Loading…
Reference in New Issue