Add a knob PROXY_REJECT_BATCH_QUEUED_TOO_LONG

Disable the proxy rejection feature for backup workload, because of the
ApplyMutationsError.
This commit is contained in:
Jingyu Zhou 2020-11-28 19:58:39 -08:00
parent 5cb0b138be
commit df5293e2be
8 changed files with 40 additions and 6 deletions

View File

@ -344,6 +344,7 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( MAX_PROXY_COMPUTE, 2.0 );
init( PROXY_COMPUTE_BUCKETS, 20000 );
init( PROXY_COMPUTE_GROWTH_RATE, 0.01 );
init( PROXY_REJECT_BATCH_QUEUED_TOO_LONG, true );
init( RESET_MASTER_BATCHES, 200 );
init( RESET_RESOLVER_BATCHES, 200 );

View File

@ -289,6 +289,7 @@ public:
double MAX_PROXY_COMPUTE;
int PROXY_COMPUTE_BUCKETS;
double PROXY_COMPUTE_GROWTH_RATE;
bool PROXY_REJECT_BATCH_QUEUED_TOO_LONG;
int RESET_MASTER_BATCHES;
int RESET_RESOLVER_BATCHES;

View File

@ -627,19 +627,27 @@ ACTOR Future<Void> commitBatch(
TEST(self->latestLocalCommitBatchResolving.get() < localBatchNumber-1); // Queuing pre-resolution commit processing
wait(self->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber-1));
double queuingDelay = g_network->timer() - timeStart;
if (queuingDelay > (double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS / SERVER_KNOBS->VERSIONS_PER_SECOND ||
(BUGGIFY && g_network->isSimulated() && deterministicRandom()->random01() < 0.01 && trs.size() > 0 &&
!trs[0].transaction.mutations[0].param1.startsWith(LiteralStringRef("\xff")))) {
if ((queuingDelay > (double)SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS / SERVER_KNOBS->VERSIONS_PER_SECOND ||
(BUGGIFY && g_network->isSimulated() && deterministicRandom()->random01() < 0.01)) &&
SERVER_KNOBS->PROXY_REJECT_BATCH_QUEUED_TOO_LONG &&
trs.size() > 0 && !trs[0].transaction.mutations.empty() && !trs[0].transaction.mutations[0].param1.startsWith(LiteralStringRef("\xff"))) {
// Disabled for the recovery transaction. otherwise, recovery can't finish and keeps doing more recoveries.
TEST(true); // Reject transactions in the batch
TraceEvent("ProxyReject", self->dbgid).detail("Delay", queuingDelay).detail("N", trs.size());
for (const auto m : trs[0].transaction.mutations) {
TraceEvent("ProxyReject", self->dbgid).detail("Mutation", m.toString());
TraceEvent("ProxyReject", self->dbgid).detail("Delay", queuingDelay).detail("N", trs.size()).detail("BatchNumber", localBatchNumber);
int i = 0;
for (const auto tr : trs) {
int j = 0;
for (const auto& m : tr.transaction.mutations) {
TraceEvent("ProxyReject", self->dbgid).detail("T", i).detail("M", j).detail("Mutation", m.toString());
j++;
}
i++;
}
ASSERT(self->latestLocalCommitBatchResolving.get() == localBatchNumber - 1);
self->latestLocalCommitBatchResolving.set(localBatchNumber);
wait(self->latestLocalCommitBatchLogging.whenAtLeast(localBatchNumber-1));
ASSERT(self->latestLocalCommitBatchLogging.get() == localBatchNumber - 1);
self->latestLocalCommitBatchLogging.set(localBatchNumber);
for (const auto& tr : trs) {
tr.reply.sendError(not_committed());

View File

@ -61,6 +61,9 @@ struct AtomicRestoreWorkload : TestWorkload {
ACTOR static Future<Void> _start(Database cx, AtomicRestoreWorkload* self) {
state FileBackupAgent backupAgent;
// Disable proxy rejection
const_cast<ServerKnobs*>(SERVER_KNOBS)->PROXY_REJECT_BATCH_QUEUED_TOO_LONG = false;
wait( delay(self->startAfter * deterministicRandom()->random01()) );
TraceEvent("AtomicRestore_Start");
@ -105,6 +108,7 @@ struct AtomicRestoreWorkload : TestWorkload {
}
TraceEvent("AtomicRestore_Done");
const_cast<ServerKnobs*>(SERVER_KNOBS)->PROXY_REJECT_BATCH_QUEUED_TOO_LONG = true;
return Void();
}
};

View File

@ -152,6 +152,9 @@ struct AtomicSwitchoverWorkload : TestWorkload {
state DatabaseBackupAgent backupAgent(cx);
state DatabaseBackupAgent restoreAgent(self->extraDB);
// Disable proxy rejection to avoid ApplyMutationsError
const_cast<ServerKnobs*>(SERVER_KNOBS)->PROXY_REJECT_BATCH_QUEUED_TOO_LONG = false;
TraceEvent("AS_Wait1");
wait(success( backupAgent.waitBackup(self->extraDB, BackupAgentBase::getDefaultTag(), false) ));
TraceEvent("AS_Ready1");
@ -177,6 +180,8 @@ struct AtomicSwitchoverWorkload : TestWorkload {
g_simulator.drAgents = ISimulator::NoBackupAgents;
}
const_cast<ServerKnobs*>(SERVER_KNOBS)->PROXY_REJECT_BATCH_QUEUED_TOO_LONG = true;
return Void();
}
};

View File

@ -22,6 +22,8 @@
#include "fdbclient/ManagementAPI.actor.h"
#include "fdbclient/NativeAPI.actor.h"
#include "fdbserver/workloads/workloads.actor.h"
#include "fdbserver/Knobs.h"
#include "flow/actorcompiler.h" // This must be the last #include.
struct BackupToDBAbort : TestWorkload {
@ -54,6 +56,8 @@ struct BackupToDBAbort : TestWorkload {
ACTOR static Future<Void> _setup(BackupToDBAbort* self, Database cx) {
state DatabaseBackupAgent backupAgent(cx);
try {
// Disable proxy rejection to avoid ApplyMutationsError
const_cast<ServerKnobs*>(SERVER_KNOBS)->PROXY_REJECT_BATCH_QUEUED_TOO_LONG = false;
TraceEvent("BDBA_Submit1");
wait( backupAgent.submitBackup(self->extraDB, BackupAgentBase::getDefaultTag(), self->backupRanges, false, StringRef(), StringRef(), true) );
TraceEvent("BDBA_Submit2");
@ -61,6 +65,7 @@ struct BackupToDBAbort : TestWorkload {
if( e.code() != error_code_backup_duplicate )
throw;
}
const_cast<ServerKnobs*>(SERVER_KNOBS)->PROXY_REJECT_BATCH_QUEUED_TOO_LONG = true;
return Void();
}

View File

@ -442,6 +442,9 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
TraceEvent("BARW_Arguments").detail("BackupTag", printable(self->backupTag)).detail("BackupAfter", self->backupAfter)
.detail("AbortAndRestartAfter", self->abortAndRestartAfter).detail("DifferentialAfter", self->stopDifferentialAfter);
// Disable proxy rejection to avoid ApplyMutationsError
const_cast<ServerKnobs*>(SERVER_KNOBS)->PROXY_REJECT_BATCH_QUEUED_TOO_LONG = false;
state UID randomID = nondeterministicRandom()->randomUniqueID();
// Increment the backup agent requets
@ -575,6 +578,8 @@ struct BackupToDBCorrectnessWorkload : TestWorkload {
throw;
}
const_cast<ServerKnobs*>(SERVER_KNOBS)->PROXY_REJECT_BATCH_QUEUED_TOO_LONG = true;
return Void();
}
};

View File

@ -346,6 +346,9 @@ struct BackupToDBUpgradeWorkload : TestWorkload {
state UID logUid;
state Version commitVersion;
// Disable proxy rejection to avoid ApplyMutationsError
const_cast<ServerKnobs*>(SERVER_KNOBS)->PROXY_REJECT_BATCH_QUEUED_TOO_LONG = false;
state Future<Void> stopDifferential = delay(self->stopDifferentialAfter);
state Future<Void> waitUpgrade = backupAgent.waitUpgradeToLatestDrVersion(self->extraDB, self->backupTag);
wait(success(stopDifferential) && success(waitUpgrade));
@ -462,6 +465,8 @@ struct BackupToDBUpgradeWorkload : TestWorkload {
throw;
}
const_cast<ServerKnobs*>(SERVER_KNOBS)->PROXY_REJECT_BATCH_QUEUED_TOO_LONG = true;
return Void();
}
};