Merge pull request #3575 from xis19/refactor

Refactor the fdbserver/MasterProxyServer.actor.cpp:commitBatch
This commit is contained in:
Xiaoge Su 2020-08-06 17:59:39 -07:00 committed by GitHub
commit 0bed41c0d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 994 additions and 650 deletions

View File

@ -45,9 +45,14 @@ Reference<StorageInfo> getStorageInfo(UID id, std::map<UID, Reference<StorageInf
// It is incredibly important that any modifications to txnStateStore are done in such a way that
// the same operations will be done on all proxies at the same time. Otherwise, the data stored in
// txnStateStore will become corrupted.
void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRef> const& mutations, IKeyValueStore* txnStateStore, LogPushData* toCommit, bool *confChange, Reference<ILogSystem> logSystem, Version popVersion,
KeyRangeMap<std::set<Key> >* vecBackupKeys, KeyRangeMap<ServerCacheInfo>* keyInfo, KeyRangeMap<bool>* cacheInfo, std::map<Key, applyMutationsData>* uid_applyMutationsData, RequestStream<CommitTransactionRequest> commit,
Database cx, NotifiedVersion* commitVersion, std::map<UID, Reference<StorageInfo>>* storageCache, std::map<Tag, Version>* tag_popped, bool initialCommit ) {
void applyMetadataMutations(UID const& dbgid, Arena& arena, VectorRef<MutationRef> const& mutations,
IKeyValueStore* txnStateStore, LogPushData* toCommit, bool& confChange,
Reference<ILogSystem> logSystem, Version popVersion,
KeyRangeMap<std::set<Key>>* vecBackupKeys, KeyRangeMap<ServerCacheInfo>* keyInfo,
KeyRangeMap<bool>* cacheInfo, std::map<Key, ApplyMutationsData>* uid_applyMutationsData,
RequestStream<CommitTransactionRequest> commit, Database cx, NotifiedVersion* commitVersion,
std::map<UID, Reference<StorageInfo>>* storageCache, std::map<Tag, Version>* tag_popped,
bool initialCommit) {
//std::map<keyRef, vector<uint16_t>> cacheRangeInfo;
std::map<KeyRef, MutationRef> cachedRangeInfo;
for (auto const& m : mutations) {
@ -175,7 +180,7 @@ void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRe
.detail("M", m.toString())
.detail("PrevValue", t.present() ? t.get() : LiteralStringRef("(none)"))
.detail("ToCommit", toCommit!=nullptr);
if(confChange) *confChange = true;
confChange = true;
}
}
if(!initialCommit) txnStateStore->set(KeyValueRef(m.param1, m.param2));
@ -293,7 +298,7 @@ void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRe
Version requested = BinaryReader::fromStringRef<Version>(m.param2, Unversioned());
TraceEvent("MinRequiredCommitVersion", dbgid).detail("Min", requested).detail("Current", popVersion).detail("HasConf", !!confChange);
if(!initialCommit) txnStateStore->set(KeyValueRef(m.param1, m.param2));
if (confChange) *confChange = true;
confChange = true;
TEST(true); // Recovering at a higher version.
}
}
@ -313,7 +318,7 @@ void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRe
if(!initialCommit) txnStateStore->clear(range & configKeys);
if(!excludedServersKeys.contains(range) && !failedServersKeys.contains(range)) {
TraceEvent("MutationRequiresRestart", dbgid).detail("M", m.toString());
if(confChange) *confChange = true;
confChange = true;
}
}
if ( serverListKeys.intersects( range )) {
@ -329,11 +334,14 @@ void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRe
auto serverKeysCleared = txnStateStore->readRange( range & serverTagKeys ).get(); // read is expected to be immediately available
for(auto &kv : serverKeysCleared) {
Tag tag = decodeServerTagValue(kv.value);
TraceEvent("ServerTagRemove").detail("PopVersion", popVersion).detail("Tag", tag.toString()).detail("Server", decodeServerTagKey(kv.key));
logSystem->pop( popVersion, decodeServerTagValue(kv.value) );
TraceEvent("ServerTagRemove")
.detail("PopVersion", popVersion)
.detail("Tag", tag.toString())
.detail("Server", decodeServerTagKey(kv.key));
logSystem->pop(popVersion, decodeServerTagValue(kv.value));
(*tag_popped)[tag] = popVersion;
if(toCommit) {
if (toCommit) {
MutationRef privatized = m;
privatized.param1 = kv.key.withPrefix(systemKeys.begin, arena);
privatized.param2 = keyAfter(kv.key, arena).withPrefix(systemKeys.begin, arena);
@ -536,3 +544,31 @@ void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRe
}
}
}
void applyMetadataMutations(ProxyCommitData& proxyCommitData, Arena& arena, Reference<ILogSystem> logSystem,
const VectorRef<MutationRef>& mutations, LogPushData* toCommit, bool& confChange,
Version popVersion, bool initialCommit) {
std::map<Key, ApplyMutationsData>* uid_applyMutationsData = nullptr;
if (proxyCommitData.firstProxy) {
uid_applyMutationsData = &proxyCommitData.uid_applyMutationsData;
}
applyMetadataMutations(proxyCommitData.dbgid, arena, mutations, proxyCommitData.txnStateStore, toCommit, confChange,
logSystem, popVersion, &proxyCommitData.vecBackupKeys, &proxyCommitData.keyInfo,
&proxyCommitData.cacheInfo, uid_applyMutationsData, proxyCommitData.commit,
proxyCommitData.cx, &proxyCommitData.committedVersion, &proxyCommitData.storageCache,
&proxyCommitData.tag_popped, initialCommit);
}
void applyMetadataMutations(const UID& dbgid, Arena& arena, const VectorRef<MutationRef>& mutations,
IKeyValueStore* txnStateStore) {
bool confChange; // Dummy variable, not used.
applyMetadataMutations(dbgid, arena, mutations, txnStateStore, /* toCommit= */ nullptr, confChange,
Reference<ILogSystem>(), /* popVersion= */ 0, /* vecBackupKeys= */ nullptr,
/* keyInfo= */ nullptr, /* cacheInfo= */ nullptr, /* uid_applyMutationsData= */ nullptr,
RequestStream<CommitTransactionRequest>(), Database(), /* commitVersion= */ nullptr,
/* storageCache= */ nullptr, /* tag_popped= */ nullptr, /* initialCommit= */ false);
}

View File

@ -29,6 +29,7 @@
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/LogProtocolMessage.h"
#include "fdbserver/ProxyCommitData.actor.h"
inline bool isMetadataMutation(MutationRef const& m) {
// FIXME: This is conservative - not everything in system keyspace is necessarily processed by applyMetadataMutations
@ -36,16 +37,12 @@ inline bool isMetadataMutation(MutationRef const& m) {
(m.type == MutationRef::ClearRange && m.param2.size() && m.param2[0] == systemKeys.begin[0] && !nonMetadataSystemKeys.contains(KeyRangeRef(m.param1, m.param2)) );
}
struct applyMutationsData {
Future<Void> worker;
Version endVersion;
Reference<KeyRangeMap<Version>> keyVersion;
};
Reference<StorageInfo> getStorageInfo(UID id, std::map<UID, Reference<StorageInfo>>* storageCache, IKeyValueStore* txnStateStore);
void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<MutationRef> const& mutations, IKeyValueStore* txnStateStore, LogPushData* toCommit, bool *confChange, Reference<ILogSystem> logSystem = Reference<ILogSystem>(), Version popVersion = 0,
KeyRangeMap<std::set<Key> >* vecBackupKeys = nullptr, KeyRangeMap<ServerCacheInfo>* keyInfo = nullptr, KeyRangeMap<bool>* cacheInfo = nullptr, std::map<Key, applyMutationsData>* uid_applyMutationsData = nullptr, RequestStream<CommitTransactionRequest> commit = RequestStream<CommitTransactionRequest>(),
Database cx = Database(), NotifiedVersion* commitVersion = nullptr, std::map<UID, Reference<StorageInfo>>* storageCache = nullptr, std::map<Tag, Version>* tag_popped = nullptr, bool initialCommit = false );
void applyMetadataMutations(ProxyCommitData& proxyCommitData, Arena& arena, Reference<ILogSystem> logSystem,
const VectorRef<MutationRef>& mutations, LogPushData* pToCommit, bool& confChange,
Version popVersion, bool initialCommit);
void applyMetadataMutations(const UID& dbgid, Arena& arena, const VectorRef<MutationRef>& mutations,
IKeyValueStore* txnStateStore);
#endif

View File

@ -57,6 +57,7 @@ set(FDBSERVER_SRCS
OldTLogServer_6_0.actor.cpp
OldTLogServer_6_2.actor.cpp
Orderer.actor.h
ProxyCommitData.actor.h
pubsub.actor.cpp
pubsub.h
QuietDatabase.actor.cpp

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,269 @@
/*
* ProxyCommitData.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2020 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#if defined(NO_INTELLISENSE) && !defined(FDBSERVER_PROXYCOMMITDATA_ACTOR_G_H)
#define FDBSERVER_PROXYCOMMITDATA_ACTOR_G_H
#include "fdbserver/ProxyCommitData.actor.g.h"
#elif !defined(FDBSERVER_PROXYCOMMITDATA_ACTOR_H)
#define FDBSERVER_PROXYCOMMITDATA_ACTOR_H
#include "fdbclient/FDBTypes.h"
#include "fdbrpc/Stats.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/LogSystemDiskQueueAdapter.h"
#include "flow/IRandom.h"
#include "flow/actorcompiler.h" // This must be the last #include.
DESCR struct SingleKeyMutation {
Standalone<StringRef> shardBegin;
Standalone<StringRef> shardEnd;
int64_t tag1;
int64_t tag2;
int64_t tag3;
};
struct ApplyMutationsData {
Future<Void> worker;
Version endVersion;
Reference<KeyRangeMap<Version>> keyVersion;
};
struct ProxyStats {
CounterCollection cc;
Counter txnRequestIn, txnRequestOut, txnRequestErrors;
Counter txnStartIn, txnStartOut, txnStartBatch;
Counter txnSystemPriorityStartIn, txnSystemPriorityStartOut;
Counter txnBatchPriorityStartIn, txnBatchPriorityStartOut;
Counter txnDefaultPriorityStartIn, txnDefaultPriorityStartOut;
Counter txnCommitIn, txnCommitVersionAssigned, txnCommitResolving, txnCommitResolved, txnCommitOut,
txnCommitOutSuccess, txnCommitErrors;
Counter txnConflicts;
Counter txnThrottled;
Counter commitBatchIn, commitBatchOut;
Counter mutationBytes;
Counter mutations;
Counter conflictRanges;
Counter keyServerLocationIn, keyServerLocationOut, keyServerLocationErrors;
Version lastCommitVersionAssigned;
LatencySample commitLatencySample;
LatencySample grvLatencySample;
LatencyBands commitLatencyBands;
LatencyBands grvLatencyBands;
Future<Void> logger;
int recentRequests;
Deque<int> requestBuckets;
double lastBucketBegin;
double bucketInterval;
void updateRequestBuckets() {
while (now() - lastBucketBegin > bucketInterval) {
lastBucketBegin += bucketInterval;
recentRequests -= requestBuckets.front();
requestBuckets.pop_front();
requestBuckets.push_back(0);
}
}
void addRequest() {
updateRequestBuckets();
++recentRequests;
++requestBuckets.back();
}
int getRecentRequests() {
updateRequestBuckets();
return recentRequests * FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE /
(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE - (lastBucketBegin + bucketInterval - now()));
}
explicit ProxyStats(UID id, Version* pVersion, NotifiedVersion* pCommittedVersion,
int64_t* commitBatchesMemBytesCountPtr)
: cc("ProxyStats", id.toString()), recentRequests(0), lastBucketBegin(now()),
bucketInterval(FLOW_KNOBS->BASIC_LOAD_BALANCE_UPDATE_RATE / FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS),
txnRequestIn("TxnRequestIn", cc), txnRequestOut("TxnRequestOut", cc), txnRequestErrors("TxnRequestErrors", cc),
txnStartIn("TxnStartIn", cc), txnStartOut("TxnStartOut", cc), txnStartBatch("TxnStartBatch", cc),
txnSystemPriorityStartIn("TxnSystemPriorityStartIn", cc),
txnSystemPriorityStartOut("TxnSystemPriorityStartOut", cc),
txnBatchPriorityStartIn("TxnBatchPriorityStartIn", cc),
txnBatchPriorityStartOut("TxnBatchPriorityStartOut", cc),
txnDefaultPriorityStartIn("TxnDefaultPriorityStartIn", cc),
txnDefaultPriorityStartOut("TxnDefaultPriorityStartOut", cc), txnCommitIn("TxnCommitIn", cc),
txnCommitVersionAssigned("TxnCommitVersionAssigned", cc), txnCommitResolving("TxnCommitResolving", cc),
txnCommitResolved("TxnCommitResolved", cc), txnCommitOut("TxnCommitOut", cc),
txnCommitOutSuccess("TxnCommitOutSuccess", cc), txnCommitErrors("TxnCommitErrors", cc),
txnConflicts("TxnConflicts", cc), txnThrottled("TxnThrottled", cc), commitBatchIn("CommitBatchIn", cc),
commitBatchOut("CommitBatchOut", cc), mutationBytes("MutationBytes", cc), mutations("Mutations", cc),
conflictRanges("ConflictRanges", cc), keyServerLocationIn("KeyServerLocationIn", cc),
keyServerLocationOut("KeyServerLocationOut", cc), keyServerLocationErrors("KeyServerLocationErrors", cc),
lastCommitVersionAssigned(0),
commitLatencySample("CommitLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
grvLatencySample("GRVLatencyMetrics", id, SERVER_KNOBS->LATENCY_METRICS_LOGGING_INTERVAL,
SERVER_KNOBS->LATENCY_SAMPLE_SIZE),
commitLatencyBands("CommitLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY),
grvLatencyBands("GRVLatencyMetrics", id, SERVER_KNOBS->STORAGE_LOGGING_DELAY) {
specialCounter(cc, "LastAssignedCommitVersion", [this]() { return this->lastCommitVersionAssigned; });
specialCounter(cc, "Version", [pVersion]() { return *pVersion; });
specialCounter(cc, "CommittedVersion", [pCommittedVersion]() { return pCommittedVersion->get(); });
specialCounter(cc, "CommitBatchesMemBytesCount",
[commitBatchesMemBytesCountPtr]() { return *commitBatchesMemBytesCountPtr; });
logger = traceCounters("ProxyMetrics", id, SERVER_KNOBS->WORKER_LOGGING_INTERVAL, &cc, "ProxyMetrics");
for (int i = 0; i < FLOW_KNOBS->BASIC_LOAD_BALANCE_BUCKETS; i++) {
requestBuckets.push_back(0);
}
}
};
struct ProxyCommitData {
UID dbgid;
int64_t commitBatchesMemBytesCount;
ProxyStats stats;
MasterInterface master;
vector<ResolverInterface> resolvers;
LogSystemDiskQueueAdapter* logAdapter;
Reference<ILogSystem> logSystem;
IKeyValueStore* txnStateStore;
NotifiedVersion committedVersion; // Provided that this recovery has succeeded or will succeed, this version is
// fully committed (durable)
Version minKnownCommittedVersion; // No version smaller than this one will be used as the known committed version
// during recovery
Version version; // The version at which txnStateStore is up to date
Promise<Void> validState; // Set once txnStateStore and version are valid
double lastVersionTime;
KeyRangeMap<std::set<Key>> vecBackupKeys;
uint64_t commitVersionRequestNumber;
uint64_t mostRecentProcessedRequestNumber;
KeyRangeMap<Deque<std::pair<Version, int>>> keyResolvers;
KeyRangeMap<ServerCacheInfo> keyInfo;
KeyRangeMap<bool> cacheInfo;
std::map<Key, ApplyMutationsData> uid_applyMutationsData;
bool firstProxy;
double lastCoalesceTime;
bool locked;
Optional<Value> metadataVersion;
double commitBatchInterval;
int64_t localCommitBatchesStarted;
NotifiedVersion latestLocalCommitBatchResolving;
NotifiedVersion latestLocalCommitBatchLogging;
RequestStream<GetReadVersionRequest> getConsistentReadVersion;
RequestStream<CommitTransactionRequest> commit;
Database cx;
Reference<AsyncVar<ServerDBInfo>> db;
EventMetricHandle<SingleKeyMutation> singleKeyMutationEvent;
std::map<UID, Reference<StorageInfo>> storageCache;
std::map<Tag, Version> tag_popped;
Deque<std::pair<Version, Version>> txsPopVersions;
Version lastTxsPop;
bool popRemoteTxs;
vector<Standalone<StringRef>> whitelistedBinPathVec;
Optional<LatencyBandConfig> latencyBandConfig;
double lastStartCommit;
double lastCommitLatency;
int updateCommitRequests = 0;
NotifiedDouble lastCommitTime;
vector<double> commitComputePerOperation;
TransactionTagMap<TransactionCommitCostEstimation> transactionTagCommitCostEst;
// The tag related to a storage server rarely change, so we keep a vector of tags for each key range to be slightly
// more CPU efficient. When a tag related to a storage server does change, we empty out all of these vectors to
// signify they must be repopulated. We do not repopulate them immediately to avoid a slow task.
const vector<Tag>& tagsForKey(StringRef key) {
auto& tags = keyInfo[key].tags;
if (!tags.size()) {
auto& r = keyInfo.rangeContaining(key).value();
for (auto info : r.src_info) {
r.tags.push_back(info->tag);
}
for (auto info : r.dest_info) {
r.tags.push_back(info->tag);
}
uniquify(r.tags);
return r.tags;
}
return tags;
}
bool needsCacheTag(KeyRangeRef range) {
auto ranges = cacheInfo.intersectingRanges(range);
for (auto r : ranges) {
if (r.value()) {
return true;
}
}
return false;
}
void updateLatencyBandConfig(Optional<LatencyBandConfig> newLatencyBandConfig) {
if (newLatencyBandConfig.present() != latencyBandConfig.present() ||
(newLatencyBandConfig.present() &&
newLatencyBandConfig.get().grvConfig != latencyBandConfig.get().grvConfig)) {
TraceEvent("LatencyBandGrvUpdatingConfig").detail("Present", newLatencyBandConfig.present());
stats.grvLatencyBands.clearBands();
if (newLatencyBandConfig.present()) {
for (auto band : newLatencyBandConfig.get().grvConfig.bands) {
stats.grvLatencyBands.addThreshold(band);
}
}
}
if (newLatencyBandConfig.present() != latencyBandConfig.present() ||
(newLatencyBandConfig.present() &&
newLatencyBandConfig.get().commitConfig != latencyBandConfig.get().commitConfig)) {
TraceEvent("LatencyBandCommitUpdatingConfig").detail("Present", newLatencyBandConfig.present());
stats.commitLatencyBands.clearBands();
if (newLatencyBandConfig.present()) {
for (auto band : newLatencyBandConfig.get().commitConfig.bands) {
stats.commitLatencyBands.addThreshold(band);
}
}
}
latencyBandConfig = newLatencyBandConfig;
}
ProxyCommitData(UID dbgid, MasterInterface master, RequestStream<GetReadVersionRequest> getConsistentReadVersion,
Version recoveryTransactionVersion, RequestStream<CommitTransactionRequest> commit,
Reference<AsyncVar<ServerDBInfo>> db, bool firstProxy)
: dbgid(dbgid), stats(dbgid, &version, &committedVersion, &commitBatchesMemBytesCount), master(master),
logAdapter(NULL), txnStateStore(NULL), popRemoteTxs(false), committedVersion(recoveryTransactionVersion),
version(0), minKnownCommittedVersion(0), lastVersionTime(0), commitVersionRequestNumber(1),
mostRecentProcessedRequestNumber(0), getConsistentReadVersion(getConsistentReadVersion), commit(commit),
lastCoalesceTime(0), localCommitBatchesStarted(0), locked(false),
commitBatchInterval(SERVER_KNOBS->COMMIT_TRANSACTION_BATCH_INTERVAL_MIN), firstProxy(firstProxy),
cx(openDBOnServer(db, TaskPriority::DefaultEndpoint, true, true)), db(db),
singleKeyMutationEvent(LiteralStringRef("SingleKeyMutation")), commitBatchesMemBytesCount(0), lastTxsPop(0),
lastStartCommit(0), lastCommitLatency(SERVER_KNOBS->REQUIRED_MIN_RECOVERY_DURATION), lastCommitTime(0) {
commitComputePerOperation.resize(SERVER_KNOBS->PROXY_COMPUTE_BUCKETS, 0.0);
}
};
#include "flow/unactorcompiler.h"
#endif // FDBSERVER_PROXYCOMMITDATA_H

View File

@ -18,31 +18,34 @@
* limitations under the License.
*/
#include "flow/ActorCollection.h"
#include "fdbrpc/PerfMetric.h"
#include "flow/Trace.h"
#include "fdbrpc/FailureMonitor.h"
#include <iterator>
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/Notified.h"
#include "fdbclient/SystemData.h"
#include "fdbserver/ConflictSet.h"
#include "fdbserver/DataDistribution.actor.h"
#include "fdbserver/Knobs.h"
#include <iterator>
#include "fdbrpc/FailureMonitor.h"
#include "fdbrpc/PerfMetric.h"
#include "fdbrpc/sim_validation.h"
#include "fdbserver/ApplyMetadataMutation.h"
#include "fdbserver/BackupProgress.actor.h"
#include "fdbserver/MasterInterface.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/ConflictSet.h"
#include "fdbserver/CoordinatedState.h"
#include "fdbserver/CoordinationInterface.h" // copy constructors for ServerCoordinators class
#include "fdbrpc/sim_validation.h"
#include "fdbserver/DBCoreState.h"
#include "fdbserver/DataDistribution.actor.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/Knobs.h"
#include "fdbserver/LogSystem.h"
#include "fdbserver/LogSystemDiskQueueAdapter.h"
#include "fdbserver/IKeyValueStore.h"
#include "fdbserver/ApplyMetadataMutation.h"
#include "fdbserver/MasterInterface.h"
#include "fdbserver/ProxyCommitData.actor.h"
#include "fdbserver/RecoveryState.h"
#include "fdbserver/ServerDBInfo.h"
#include "fdbserver/WaitFailure.h"
#include "fdbserver/WorkerInterface.actor.h"
#include "flow/ActorCollection.h"
#include "flow/Trace.h"
#include "flow/actorcompiler.h" // This must be the last #include.
using std::vector;
@ -1561,7 +1564,8 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
}
}
applyMetadataMutations(self->dbgid, recoveryCommitRequest.arena, tr.mutations.slice(mmApplied, tr.mutations.size()), self->txnStateStore, nullptr, nullptr);
applyMetadataMutations(self->dbgid, recoveryCommitRequest.arena, tr.mutations.slice(mmApplied, tr.mutations.size()),
self->txnStateStore);
mmApplied = tr.mutations.size();
tr.read_snapshot = self->recoveryTransactionVersion; // lastEpochEnd would make more sense, but isn't in the initial window of the resolver(s)