Add knob PROXY_USE_RESOLVER_PRIVATE_MUTATIONS

To control proxy to use private mutations from resolvers or not.
This commit is contained in:
Jingyu Zhou 2021-10-04 13:52:40 -07:00 committed by Dan Lambright
parent c1d7b03087
commit 0dc9c607f4
7 changed files with 47 additions and 25 deletions

View File

@ -192,8 +192,13 @@ struct CommitTransactionRef {
template <class Ar>
force_inline void serialize(Ar& ar) {
if constexpr (is_fb_function<Ar>) {
serializer(
ar, read_conflict_ranges, write_conflict_ranges, mutations, read_snapshot, report_conflicting_keys, spanContext);
serializer(ar,
read_conflict_ranges,
write_conflict_ranges,
mutations,
read_snapshot,
report_conflicting_keys,
spanContext);
} else {
serializer(ar, read_conflict_ranges, write_conflict_ranges, mutations, read_snapshot);
if (ar.protocolVersion().hasReportConflictingKeys()) {

View File

@ -411,6 +411,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( TXN_STATE_SEND_AMOUNT, 4 );
init( REPORT_TRANSACTION_COST_ESTIMATION_DELAY, 0.1 );
init( PROXY_REJECT_BATCH_QUEUED_TOO_LONG, true );
init( PROXY_USE_RESOLVER_PRIVATE_MUTATIONS, false );
init( RESET_MASTER_BATCHES, 200 );
init( RESET_RESOLVER_BATCHES, 200 );

View File

@ -338,6 +338,7 @@ public:
int TXN_STATE_SEND_AMOUNT;
double REPORT_TRANSACTION_COST_ESTIMATION_DELAY;
bool PROXY_REJECT_BATCH_QUEUED_TOO_LONG;
bool PROXY_USE_RESOLVER_PRIVATE_MUTATIONS;
int RESET_MASTER_BATCHES;
int RESET_RESOLVER_BATCHES;

View File

@ -20,9 +20,10 @@
#ifndef FDBSERVER_APPLYMETADATAMUTATION_H
#define FDBSERVER_APPLYMETADATAMUTATION_H
#include <cstddef>
#pragma once
#include <cstddef>
#include "fdbclient/BackupAgent.actor.h"
#include "fdbclient/MutationList.h"
#include "fdbclient/Notified.h"

View File

@ -930,7 +930,7 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
self->arena,
pProxyCommitData->logSystem,
trs[t].transaction.mutations,
/* pToCommit= */ nullptr,
SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS ? nullptr : &self->toCommit,
self->forceRecovery,
self->commitVersion + 1,
/* initialCommit= */ false);
@ -941,10 +941,10 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
self->forceRecovery = false;
}
}
auto privateMutations = self->toCommit.getAllMessages();
ResolveTransactionBatchReply& reply = self->resolution[0];
ASSERT_WE_THINK(privateMutations.size() == reply.privateMutations.size());
self->toCommit.setMutations(reply.privateMutationCount, reply.privateMutations);
if (SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS) {
ResolveTransactionBatchReply& reply = self->resolution[0];
self->toCommit.setMutations(reply.privateMutationCount, reply.privateMutations);
}
if (self->forceRecovery) {
for (; t < trs.size(); t++)
self->committed[t] = ConflictBatch::TransactionConflict;

View File

@ -255,18 +255,21 @@ ACTOR Future<Void> resolveBatch(Reference<Resolver> self, ResolveTransactionBatc
req.transactions[t].mutations));
// Generate private mutations for metadata mutations
if (reply.committed[t] == ConflictBatch::TransactionCommitted) {
if (reply.committed[t] == ConflictBatch::TransactionCommitted &&
SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS) {
applyMetadataMutations(req.transactions[t].spanContext, resolverData, req.transactions[t].mutations);
}
}
// Adds private mutation messages to the reply message.
auto privateMutations = toCommit.getAllMessages();
for (const auto& mutations : privateMutations) {
reply.privateMutations.push_back(reply.arena, mutations);
reply.arena.dependsOn(mutations.arena());
if (SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS) {
auto privateMutations = toCommit.getAllMessages();
for (const auto& mutations : privateMutations) {
reply.privateMutations.push_back(reply.arena, mutations);
reply.arena.dependsOn(mutations.arena());
}
reply.privateMutationCount = toCommit.getMutationCount();
}
reply.privateMutationCount = toCommit.getMutationCount();;
self->resolvedStateTransactions += req.txnStateTransactions.size();
self->resolvedStateMutations += stateMutations;
@ -388,7 +391,7 @@ struct TransactionStateResolveContext {
TransactionStateResolveContext(Reference<Resolver> pResolverData_, PromiseStream<Future<Void>>* pActors_)
: pResolverData(pResolverData_), pTxnStateStore(pResolverData_->txnStateStore), pActors(pActors_) {
ASSERT(pTxnStateStore != nullptr);
ASSERT(pTxnStateStore != nullptr || !SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS);
}
};
@ -536,18 +539,22 @@ ACTOR Future<Void> resolverCore(ResolverInterface resolver,
}
// Initialize txnStateStore
self->logSystem = ILogSystem::fromServerDBInfo(resolver.id(), db->get(), false, addActor);
state PromiseStream<Future<Void>> addActor;
state Future<Void> onError =
transformError(actorCollection(addActor.getFuture()), broken_promise(), master_resolver_failed());
self->logSystem = ILogSystem::fromServerDBInfo(resolver.id(), db->get(), false, addActor);
self->logAdapter = new LogSystemDiskQueueAdapter(self->logSystem, Reference<AsyncVar<PeekTxsInfo>>(), 1, false);
self->txnStateStore = keyValueStoreLogSystem(self->logAdapter, resolver.id(), 2e9, true, true, true);
state TransactionStateResolveContext transactionStateResolveContext;
if (SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS) {
self->logAdapter = new LogSystemDiskQueueAdapter(self->logSystem, Reference<AsyncVar<PeekTxsInfo>>(), 1, false);
self->txnStateStore = keyValueStoreLogSystem(self->logAdapter, resolver.id(), 2e9, true, true, true);
// wait for txnStateStore recovery
wait(success(self->txnStateStore->readValue(StringRef())));
// wait for txnStateStore recovery
wait(success(self->txnStateStore->readValue(StringRef())));
// This has to be declared after the self->txnStateStore get initialized
state TransactionStateResolveContext transactionStateResolveContext(self, &addActor);
// This has to be declared after the self->txnStateStore get initialized
transactionStateResolveContext = TransactionStateResolveContext(self, &addActor);
}
loop choose {
when(ResolveTransactionBatchRequest batch = waitNext(resolver.resolve.getFuture())) {
@ -572,7 +579,11 @@ ACTOR Future<Void> resolverCore(ResolverInterface resolver,
doPollMetrics = delay(SERVER_KNOBS->SAMPLE_POLL_TIME);
}
when(TxnStateRequest request = waitNext(resolver.txnState.getFuture())) {
addActor.send(processTransactionStateRequestPart(&transactionStateResolveContext, request));
if (SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS) {
addActor.send(processTransactionStateRequestPart(&transactionStateResolveContext, request));
} else {
ASSERT(false);
}
}
}
}

View File

@ -946,8 +946,11 @@ ACTOR Future<Void> sendInitialCommitToResolvers(Reference<MasterData> self) {
for (auto& it : self->commitProxies) {
endpoints.push_back(it.txnState.getEndpoint());
}
for (auto& it : self->resolvers) {
endpoints.push_back(it.txnState.getEndpoint());
if (SERVER_KNOBS->PROXY_USE_RESOLVER_PRIVATE_MUTATIONS) {
// Broadcasts transaction state store to resolvers.
for (auto& it : self->resolvers) {
endpoints.push_back(it.txnState.getEndpoint());
}
}
loop {