parent
166bcd9ae2
commit
82bba5af0f
|
@ -323,8 +323,14 @@ ACTOR static Future<Void> decodeBackupLogValue(Arena* arena,
|
|||
offset += sizeof(uint32_t);
|
||||
state uint32_t consumed = 0;
|
||||
|
||||
if (totalBytes + offset > value.size())
|
||||
if (totalBytes + offset > value.size()) {
|
||||
TraceEvent(SevError, "OffsetOutOfBoundary")
|
||||
.detail("TotalBytes", totalBytes)
|
||||
.detail("Offset", offset)
|
||||
.detail("Version", version)
|
||||
.detail("ValueSize", value.size());
|
||||
throw restore_missing_data();
|
||||
}
|
||||
|
||||
state int originalOffset = offset;
|
||||
state DatabaseConfiguration config = wait(getDatabaseConfiguration(cx));
|
||||
|
@ -640,8 +646,13 @@ ACTOR Future<Void> readCommitted(Database cx,
|
|||
releaser = FlowLock::Releaser(*lock, rangevalue.expectedSize() + rcGroup.items.expectedSize());
|
||||
|
||||
for (auto& s : rangevalue) {
|
||||
uint64_t groupKey = groupBy(s.key).first;
|
||||
//TraceEvent("Log_ReadCommitted").detail("GroupKey", groupKey).detail("SkipGroup", skipGroup).detail("NextKey", nextKey.key).detail("End", end.key).detail("Valuesize", value.size()).detail("Index",index++).detail("Size",s.value.size());
|
||||
Version groupKey = groupBy(s.key).first; // mutation's commit version
|
||||
// TraceEvent("Log_ReadCommitted")
|
||||
// .detail("GroupKey", groupKey)
|
||||
// .detail("SkipGroup", skipGroup)
|
||||
// .detail("Begin", range.begin)
|
||||
// .detail("End", range.end)
|
||||
// .detail("Size", s.value.size());
|
||||
if (groupKey != skipGroup) {
|
||||
if (rcGroup.version == -1) {
|
||||
rcGroup.version = tr.getReadVersion().get();
|
||||
|
@ -713,7 +724,6 @@ ACTOR Future<Void> sendCommitTransactionRequest(CommitTransactionRequest req,
|
|||
NotifiedVersion* committedVersion,
|
||||
int* totalBytes,
|
||||
int* mutationSize,
|
||||
PromiseStream<Future<Void>> addActor,
|
||||
FlowLock* commitLock,
|
||||
PublicRequestStream<CommitTransactionRequest> commit) {
|
||||
Key applyBegin = uid.withPrefix(applyMutationsBeginRange.begin);
|
||||
|
@ -737,10 +747,19 @@ ACTOR Future<Void> sendCommitTransactionRequest(CommitTransactionRequest req,
|
|||
|
||||
*totalBytes += *mutationSize;
|
||||
wait(commitLock->take(TaskPriority::DefaultYield, *mutationSize));
|
||||
addActor.send(commitLock->releaseWhen(success(commit.getReply(req)), *mutationSize));
|
||||
Future<Void> commitAndUnlock = commitLock->releaseWhen(success(commit.getReply(req)), *mutationSize);
|
||||
// If tenant map is changing, we need to wait until it's committed before processing next mutations.
|
||||
// Next muations need the updated tenant map for filtering.
|
||||
// Because we are bumping applyBegin version, we need to wait for the commit to be done.
|
||||
// Otherwise, an update to the applyEnd key will trigger another applyMutation() which can
|
||||
// have an overlapping range with the current applyMutation() and cause conflicts.
|
||||
wait(commitAndUnlock);
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Decodes the backup mutation log and send the mutations to the CommitProxy.
|
||||
// The mutation logs are grouped by version and passed in as a stream of RCGroup from readCommitted().
|
||||
// The mutations are then decoded and sent to the CommitProxy in a batch.
|
||||
ACTOR Future<int> kvMutationLogToTransactions(Database cx,
|
||||
PromiseStream<RCGroup> results,
|
||||
Reference<FlowLock> lock,
|
||||
|
@ -751,7 +770,6 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
|
|||
NotifiedVersion* committedVersion,
|
||||
Optional<Version> endVersion,
|
||||
Key rangeBegin,
|
||||
PromiseStream<Future<Void>> addActor,
|
||||
FlowLock* commitLock,
|
||||
Reference<KeyRangeMap<Version>> keyVersion,
|
||||
std::map<int64_t, TenantName>* tenantMap,
|
||||
|
@ -812,7 +830,6 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
|
|||
committedVersion,
|
||||
&totalBytes,
|
||||
&mutationSize,
|
||||
addActor,
|
||||
commitLock,
|
||||
commit));
|
||||
req = CommitTransactionRequest();
|
||||
|
@ -848,16 +865,9 @@ ACTOR Future<int> kvMutationLogToTransactions(Database cx,
|
|||
throw;
|
||||
}
|
||||
}
|
||||
wait(sendCommitTransactionRequest(req,
|
||||
uid,
|
||||
newBeginVersion,
|
||||
rangeBegin,
|
||||
committedVersion,
|
||||
&totalBytes,
|
||||
&mutationSize,
|
||||
addActor,
|
||||
commitLock,
|
||||
commit));
|
||||
// TraceEvent("MutationLogRestore").detail("BeginVersion", newBeginVersion);
|
||||
wait(sendCommitTransactionRequest(
|
||||
req, uid, newBeginVersion, rangeBegin, committedVersion, &totalBytes, &mutationSize, commitLock, commit));
|
||||
if (endOfStream) {
|
||||
return totalBytes;
|
||||
}
|
||||
|
@ -932,6 +942,7 @@ ACTOR Future<Void> applyMutations(Database cx,
|
|||
try {
|
||||
loop {
|
||||
if (beginVersion >= *endVersion) {
|
||||
// Why do we need to take a lock here?
|
||||
wait(commitLock.take(TaskPriority::DefaultYield, CLIENT_KNOBS->BACKUP_LOCK_BYTES));
|
||||
commitLock.release(CLIENT_KNOBS->BACKUP_LOCK_BYTES);
|
||||
if (beginVersion >= *endVersion) {
|
||||
|
@ -969,7 +980,6 @@ ACTOR Future<Void> applyMutations(Database cx,
|
|||
committedVersion,
|
||||
idx == ranges.size() - 1 ? newEndVersion : Optional<Version>(),
|
||||
ranges[idx].begin,
|
||||
addActor,
|
||||
&commitLock,
|
||||
keyVersion,
|
||||
tenantMap,
|
||||
|
|
|
@ -5086,7 +5086,7 @@ struct LogInfo : public ReferenceCounted<LogInfo> {
|
|||
Version endVersion;
|
||||
int64_t offset;
|
||||
|
||||
LogInfo() : offset(0){};
|
||||
LogInfo() : offset(0) {}
|
||||
};
|
||||
|
||||
class FileBackupAgentImpl {
|
||||
|
|
|
@ -520,12 +520,13 @@ public:
|
|||
|
||||
using RangeResultWithVersion = std::pair<RangeResult, Version>;
|
||||
|
||||
// RCGroup contains the backup mutations for a commit version, i.e., groupKey.
|
||||
struct RCGroup {
|
||||
RangeResult items;
|
||||
Version version;
|
||||
uint64_t groupKey;
|
||||
Version version; // this is read version for this group
|
||||
Version groupKey; // this is the original commit version for this group
|
||||
|
||||
RCGroup() : version(-1), groupKey(ULLONG_MAX){};
|
||||
RCGroup() : version(-1), groupKey(ULLONG_MAX) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
|
@ -569,6 +570,8 @@ ACTOR Future<Void> readCommitted(Database cx,
|
|||
Terminator terminator = Terminator::True,
|
||||
AccessSystemKeys systemAccess = AccessSystemKeys::False,
|
||||
LockAware lockAware = LockAware::False);
|
||||
|
||||
// Applies the mutations between the beginVersion and endVersion to the database during a restore.
|
||||
ACTOR Future<Void> applyMutations(Database cx,
|
||||
Key uid,
|
||||
Key addPrefix,
|
||||
|
|
Loading…
Reference in New Issue