Merge pull request #5496 from xis19/master

Refactor TxnStateRequest handler in commitProxyServerCore
This commit is contained in:
Jingyu Zhou 2021-08-29 11:14:45 -07:00 committed by GitHub
commit e576fd45f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 171 additions and 98 deletions

View File

@ -1832,6 +1832,171 @@ ACTOR Future<Void> reportTxnTagCommitCost(UID myID,
}
}
namespace {
struct TransactionStateResolveContext {
// Maximum sequence for txnStateRequest, this is defined when the request last flag is set.
Sequence maxSequence = std::numeric_limits<Sequence>::max();
// Flags marks received transaction state requests, we only process the transaction request when *all* requests are
// received.
std::unordered_set<Sequence> receivedSequences;
ProxyCommitData* pCommitData = nullptr;
// Pointer to transaction state store, shortcut for commitData.txnStateStore
IKeyValueStore* pTxnStateStore = nullptr;
// Actor streams
PromiseStream<Future<Void>>* pActors = nullptr;
// Flag reports if the transaction state request is complete. This request should only happen during recover, i.e.
// once per commit proxy.
bool processed = false;
TransactionStateResolveContext() = default;
TransactionStateResolveContext(ProxyCommitData* pCommitData_, PromiseStream<Future<Void>>* pActors_)
: pCommitData(pCommitData_), pTxnStateStore(pCommitData_->txnStateStore), pActors(pActors_) {
ASSERT(pTxnStateStore != nullptr);
}
};
ACTOR Future<Void> processCompleteTransactionStateRequest(TransactionStateResolveContext* pContext) {
state KeyRange txnKeys = allKeys;
state std::map<Tag, UID> tag_uid;
RangeResult UIDtoTagMap = pContext->pTxnStateStore->readRange(serverTagKeys).get();
for (const KeyValueRef& kv : UIDtoTagMap) {
tag_uid[decodeServerTagValue(kv.value)] = decodeServerTagKey(kv.key);
}
loop {
wait(yield());
RangeResult data =
pContext->pTxnStateStore
->readRange(txnKeys, SERVER_KNOBS->BUGGIFIED_ROW_LIMIT, SERVER_KNOBS->APPLY_MUTATION_BYTES)
.get();
if (!data.size())
break;
((KeyRangeRef&)txnKeys) = KeyRangeRef(keyAfter(data.back().key, txnKeys.arena()), txnKeys.end);
MutationsVec mutations;
std::vector<std::pair<MapPair<Key, ServerCacheInfo>, int>> keyInfoData;
std::vector<UID> src, dest;
ServerCacheInfo info;
// NOTE: An ACTOR will be compiled into several classes, the this pointer is from one of them.
auto updateTagInfo = [this](const std::vector<UID>& uids,
std::vector<Tag>& tags,
std::vector<Reference<StorageInfo>>& storageInfoItems) {
for (const auto& id : uids) {
auto storageInfo = getStorageInfo(id, &pContext->pCommitData->storageCache, pContext->pTxnStateStore);
ASSERT(storageInfo->tag != invalidTag);
tags.push_back(storageInfo->tag);
storageInfoItems.push_back(storageInfo);
}
};
for (auto& kv : data) {
if (!kv.key.startsWith(keyServersPrefix)) {
mutations.emplace_back(mutations.arena(), MutationRef::SetValue, kv.key, kv.value);
continue;
}
KeyRef k = kv.key.removePrefix(keyServersPrefix);
if (k == allKeys.end) {
continue;
}
decodeKeyServersValue(tag_uid, kv.value, src, dest);
info.tags.clear();
info.src_info.clear();
updateTagInfo(src, info.tags, info.src_info);
info.dest_info.clear();
updateTagInfo(dest, info.tags, info.dest_info);
uniquify(info.tags);
keyInfoData.emplace_back(MapPair<Key, ServerCacheInfo>(k, info), 1);
}
// insert keyTag data separately from metadata mutations so that we can do one bulk insert which
// avoids a lot of map lookups.
pContext->pCommitData->keyInfo.rawInsert(keyInfoData);
Arena arena;
bool confChanges;
applyMetadataMutations(SpanID(),
*pContext->pCommitData,
arena,
Reference<ILogSystem>(),
mutations,
/* pToCommit= */ nullptr,
confChanges,
/* popVersion= */ 0,
/* initialCommit= */ true);
} // loop
auto lockedKey = pContext->pTxnStateStore->readValue(databaseLockedKey).get();
pContext->pCommitData->locked = lockedKey.present() && lockedKey.get().size();
pContext->pCommitData->metadataVersion = pContext->pTxnStateStore->readValue(metadataVersionKey).get();
pContext->pTxnStateStore->enableSnapshot();
return Void();
}
ACTOR Future<Void> processTransactionStateRequestPart(TransactionStateResolveContext* pContext,
TxnStateRequest request) {
state const TxnStateRequest& req = request;
state ProxyCommitData& commitData = *pContext->pCommitData;
state PromiseStream<Future<Void>>& addActor = *pContext->pActors;
state Sequence& maxSequence = pContext->maxSequence;
state ReplyPromise<Void> reply = req.reply;
state std::unordered_set<Sequence>& txnSequences = pContext->receivedSequences;
ASSERT(pContext->pCommitData != nullptr);
ASSERT(pContext->pActors != nullptr);
if (pContext->receivedSequences.count(request.sequence)) {
// This part is already received. Still we will re-broadcast it to other CommitProxies
pContext->pActors->send(broadcastTxnRequest(request, SERVER_KNOBS->TXN_STATE_SEND_AMOUNT, true));
wait(yield());
return Void();
}
if (request.last) {
// This is the last piece of subsequence, yet other pieces might still on the way.
pContext->maxSequence = request.sequence + 1;
}
pContext->receivedSequences.insert(request.sequence);
// Although we may receive the CommitTransactionRequest for the recovery transaction before all of the
// TxnStateRequest, we will not get a resolution result from any resolver until the master has submitted its initial
// (sequence 0) resolution request, which it doesn't do until we have acknowledged all TxnStateRequests
ASSERT(!pContext->pCommitData->validState.isSet());
for (auto& kv : request.data) {
pContext->pTxnStateStore->set(kv, &request.arena);
}
pContext->pTxnStateStore->commit(true);
if (pContext->receivedSequences.size() == pContext->maxSequence) {
// Received all components of the txnStateRequest
ASSERT(!pContext->processed);
wait(processCompleteTransactionStateRequest(pContext));
pContext->processed = true;
}
pContext->pActors->send(broadcastTxnRequest(request, SERVER_KNOBS->TXN_STATE_SEND_AMOUNT, true));
wait(yield());
return Void();
}
} // anonymous namespace
ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
MasterInterface master,
Reference<AsyncVar<ServerDBInfo> const> db,
@ -1851,8 +2016,6 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
state Future<Void> onError =
transformError(actorCollection(addActor.getFuture()), broken_promise(), master_tlog_failed());
state double lastCommit = 0;
state std::set<Sequence> txnSequences;
state Sequence maxSequence = std::numeric_limits<Sequence>::max();
state GetHealthMetricsReply healthMetricsReply;
state GetHealthMetricsReply detailedHealthMetricsReply;
@ -1918,6 +2081,10 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
commitBatcherActor = commitBatcher(
&commitData, batchedCommits, proxy.commit.getFuture(), commitBatchByteLimit, commitBatchesMemoryLimit);
// This has to be declared after the commitData.txnStateStore get initialized
state TransactionStateResolveContext transactionStateResolveContext(&commitData, &addActor);
loop choose {
when(wait(dbInfoChange)) {
dbInfoChange = commitData.db->onChange();
@ -1962,102 +2129,8 @@ ACTOR Future<Void> commitProxyServerCore(CommitProxyInterface proxy,
when(ExclusionSafetyCheckRequest exclCheckReq = waitNext(proxy.exclusionSafetyCheckReq.getFuture())) {
addActor.send(proxyCheckSafeExclusion(db, exclCheckReq));
}
when(state TxnStateRequest req = waitNext(proxy.txnState.getFuture())) {
state ReplyPromise<Void> reply = req.reply;
if (req.last)
maxSequence = req.sequence + 1;
if (!txnSequences.count(req.sequence)) {
txnSequences.insert(req.sequence);
ASSERT(
!commitData.validState
.isSet()); // Although we may receive the CommitTransactionRequest for the recovery transaction
// before all of the TxnStateRequest, we will not get a resolution result from any
// resolver until the master has submitted its initial (sequence 0) resolution
// request, which it doesn't do until we have acknowledged all TxnStateRequests
for (auto& kv : req.data)
commitData.txnStateStore->set(kv, &req.arena);
commitData.txnStateStore->commit(true);
if (txnSequences.size() == maxSequence) {
state KeyRange txnKeys = allKeys;
RangeResult UIDtoTagMap = commitData.txnStateStore->readRange(serverTagKeys).get();
state std::map<Tag, UID> tag_uid;
for (const KeyValueRef& kv : UIDtoTagMap) {
tag_uid[decodeServerTagValue(kv.value)] = decodeServerTagKey(kv.key);
}
loop {
wait(yield());
RangeResult data = commitData.txnStateStore
->readRange(txnKeys,
SERVER_KNOBS->BUGGIFIED_ROW_LIMIT,
SERVER_KNOBS->APPLY_MUTATION_BYTES)
.get();
if (!data.size())
break;
((KeyRangeRef&)txnKeys) = KeyRangeRef(keyAfter(data.back().key, txnKeys.arena()), txnKeys.end);
MutationsVec mutations;
std::vector<std::pair<MapPair<Key, ServerCacheInfo>, int>> keyInfoData;
vector<UID> src, dest;
ServerCacheInfo info;
for (auto& kv : data) {
if (kv.key.startsWith(keyServersPrefix)) {
KeyRef k = kv.key.removePrefix(keyServersPrefix);
if (k != allKeys.end) {
decodeKeyServersValue(tag_uid, kv.value, src, dest);
info.tags.clear();
info.src_info.clear();
info.dest_info.clear();
for (const auto& id : src) {
auto storageInfo =
getStorageInfo(id, &commitData.storageCache, commitData.txnStateStore);
ASSERT(storageInfo->tag != invalidTag);
info.tags.push_back(storageInfo->tag);
info.src_info.push_back(storageInfo);
}
for (const auto& id : dest) {
auto storageInfo =
getStorageInfo(id, &commitData.storageCache, commitData.txnStateStore);
ASSERT(storageInfo->tag != invalidTag);
info.tags.push_back(storageInfo->tag);
info.dest_info.push_back(storageInfo);
}
uniquify(info.tags);
keyInfoData.emplace_back(MapPair<Key, ServerCacheInfo>(k, info), 1);
}
} else {
mutations.emplace_back(mutations.arena(), MutationRef::SetValue, kv.key, kv.value);
}
}
// insert keyTag data separately from metadata mutations so that we can do one bulk insert which
// avoids a lot of map lookups.
commitData.keyInfo.rawInsert(keyInfoData);
Arena arena;
bool confChanges;
applyMetadataMutations(SpanID(),
commitData,
arena,
Reference<ILogSystem>(),
mutations,
/* pToCommit= */ nullptr,
confChanges,
/* popVersion= */ 0,
/* initialCommit= */ true);
}
auto lockedKey = commitData.txnStateStore->readValue(databaseLockedKey).get();
commitData.locked = lockedKey.present() && lockedKey.get().size();
commitData.metadataVersion = commitData.txnStateStore->readValue(metadataVersionKey).get();
commitData.txnStateStore->enableSnapshot();
}
}
addActor.send(broadcastTxnRequest(req, SERVER_KNOBS->TXN_STATE_SEND_AMOUNT, true));
wait(yield());
when(TxnStateRequest request = waitNext(proxy.txnState.getFuture())) {
addActor.send(processTransactionStateRequestPart(&transactionStateResolveContext, request));
}
}
}