Avoid creating LogPushData when PROXY_USE_RESOLVER_PRIVATE_MUTATIONS is off

To save CPU cost, especially for creating LogSystemConfig.
This commit is contained in:
Jingyu Zhou 2022-05-04 21:19:52 -07:00
parent 254da4d796
commit b970d507c0
1 changed files with 21 additions and 17 deletions

View File

@ -18,6 +18,8 @@
* limitations under the License.
*/
#include <memory>
#include "fdbclient/NativeAPI.actor.h"
#include "fdbclient/Notified.h"
#include "fdbclient/StorageServerInterface.h"
@ -310,20 +312,22 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self, ResolveTransactionBatc
auto& stateTransactions = stateTransactionsPair.second;
int64_t stateMutations = 0;
int64_t stateBytes = 0;
LogPushData toCommit(self->logSystem); // For accumulating private mutations
ResolverData resolverData(self->dbgid,
self->logSystem,
self->txnStateStore,
&self->keyInfo,
&toCommit,
self->forceRecovery,
req.version + 1,
&self->storageCache,
&self->tssMapping);
std::unique_ptr<LogPushData> toCommit(nullptr); // For accumulating private mutations
std::unique_ptr<ResolverData> resolverData(nullptr);
bool isLocked = false;
if (SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS) {
auto lockedKey = self->txnStateStore->readValue(databaseLockedKey).get();
isLocked = lockedKey.present() && lockedKey.get().size();
toCommit.reset(new LogPushData(self->logSystem));
resolverData.reset(new ResolverData(self->dbgid,
self->logSystem,
self->txnStateStore,
&self->keyInfo,
toCommit.get(),
self->forceRecovery,
req.version + 1,
&self->storageCache,
&self->tssMapping));
}
for (int t : req.txnStateTransactions) {
stateMutations += req.transactions[t].mutations.size();
@ -343,7 +347,7 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self, ResolveTransactionBatc
SpanContext spanContext =
req.transactions[t].spanContext.present() ? req.transactions[t].spanContext.get() : SpanContext();
applyMetadataMutations(spanContext, resolverData, req.transactions[t].mutations);
applyMetadataMutations(spanContext, *resolverData, req.transactions[t].mutations);
}
TEST(self->forceRecovery); // Resolver detects forced recovery
}
@ -361,20 +365,20 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self, ResolveTransactionBatc
// If shardChanged at or before this commit version, the proxy may have computed
// the wrong set of groups. Then we need to broadcast to all groups below.
stateTransactionsPair.first = toCommit.isShardChanged();
stateTransactionsPair.first = toCommit && toCommit->isShardChanged();
bool shardChanged = self->recentStateTransactionsInfo.applyStateTxnsToBatchReply(
&reply, firstUnseenVersion, req.version, toCommit.isShardChanged());
&reply, firstUnseenVersion, req.version, toCommit && toCommit->isShardChanged());
// Adds private mutation messages to the reply message.
if (SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS) {
auto privateMutations = toCommit.getAllMessages();
auto privateMutations = toCommit->getAllMessages();
for (const auto& mutations : privateMutations) {
reply.privateMutations.push_back(reply.arena, mutations);
reply.arena.dependsOn(mutations.arena());
}
// merge mutation tags with sent client tags
toCommit.saveTags(reply.writtenTags);
reply.privateMutationCount = toCommit.getMutationCount();
toCommit->saveTags(reply.writtenTags);
reply.privateMutationCount = toCommit->getMutationCount();
}
//TraceEvent("ResolveBatch", self->dbgid).detail("PrevVersion", req.prevVersion).detail("Version", req.version).detail("StateTransactionVersions", self->recentStateTransactionsInfo.size()).detail("StateBytes", stateBytes).detail("FirstVersion", self->recentStateTransactionsInfo.empty() ? -1 : self->recentStateTransactionsInfo.firstVersion()).detail("StateMutationsIn", req.txnStateTransactions.size()).detail("StateMutationsOut", reply.stateMutations.size()).detail("From", proxyAddress);
@ -418,7 +422,7 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self, ResolveTransactionBatc
writtenTLogs.insert(i);
}
} else {
toCommit.getLocations(reply.writtenTags, writtenTLogs);
toCommit->getLocations(reply.writtenTags, writtenTLogs);
}
if (self->tpcvVector[0] == invalidVersion) {
std::fill(self->tpcvVector.begin(), self->tpcvVector.end(), req.prevVersion);