Merge branch 'release-4.6' into release-5.0
This commit is contained in:
parent
333f2e4865
commit
1626e16377
|
@ -1375,6 +1375,7 @@ public:
|
|||
}
|
||||
|
||||
ACTOR static Future<Void> atomicSwitchover(DatabaseBackupAgent* backupAgent, Database dest, Key tagName, Standalone<VectorRef<KeyRangeRef>> backupRanges, Key addPrefix, Key removePrefix) {
|
||||
state DatabaseBackupAgent drAgent(dest);
|
||||
state UID destlogUid = wait(backupAgent->getLogUid(dest, tagName));
|
||||
state int status = wait(backupAgent->getStateValue(dest, destlogUid));
|
||||
|
||||
|
@ -1385,7 +1386,7 @@ public:
|
|||
|
||||
state UID logUid = g_random->randomUniqueID();
|
||||
state Key logUidValue = BinaryWriter::toValue(logUid, Unversioned());
|
||||
state UID logUidCurrent = wait(backupAgent->getLogUid(backupAgent->taskBucket->src, tagName));
|
||||
state UID logUidCurrent = wait(drAgent.getLogUid(backupAgent->taskBucket->src, tagName));
|
||||
|
||||
if (logUidCurrent.isValid()) {
|
||||
logUid = logUidCurrent;
|
||||
|
@ -1446,7 +1447,7 @@ public:
|
|||
TraceEvent("DBA_switchover_stopped");
|
||||
|
||||
try {
|
||||
Void _ = wait( backupAgent->submitBackup(backupAgent->taskBucket->src, tagName, backupRanges, false, addPrefix, removePrefix, true, true) );
|
||||
Void _ = wait( drAgent.submitBackup(backupAgent->taskBucket->src, tagName, backupRanges, false, addPrefix, removePrefix, true, true) );
|
||||
} catch( Error &e ) {
|
||||
if( e.code() != error_code_backup_duplicate )
|
||||
throw;
|
||||
|
@ -1454,7 +1455,7 @@ public:
|
|||
|
||||
TraceEvent("DBA_switchover_submitted");
|
||||
|
||||
int _ = wait( backupAgent->waitSubmitted(backupAgent->taskBucket->src, tagName) );
|
||||
int _ = wait( drAgent.waitSubmitted(backupAgent->taskBucket->src, tagName) );
|
||||
|
||||
TraceEvent("DBA_switchover_started");
|
||||
|
||||
|
|
|
@ -549,10 +549,14 @@ ACTOR Future<Void> readTransactionSystemState( Reference<MasterData> self, Refer
|
|||
|
||||
// Recover version info
|
||||
self->lastEpochEnd = oldLogSystem->getEnd() - 1;
|
||||
if (self->lastEpochEnd == 0)
|
||||
if (self->lastEpochEnd == 0) {
|
||||
self->recoveryTransactionVersion = 1;
|
||||
else
|
||||
} else {
|
||||
self->recoveryTransactionVersion = self->lastEpochEnd + SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT;
|
||||
if(BUGGIFY) {
|
||||
self->recoveryTransactionVersion += g_random->randomInt64(0, 1e6*SERVER_KNOBS->VERSIONS_PER_SECOND);
|
||||
}
|
||||
}
|
||||
|
||||
TraceEvent("MasterRecovering", self->dbgid).detail("lastEpochEnd", self->lastEpochEnd).detail("recoveryTransactionVersion", self->recoveryTransactionVersion);
|
||||
|
||||
|
|
|
@ -47,7 +47,7 @@ Future<Void> ApiWorkload::clearKeyspace() {
|
|||
|
||||
ACTOR Future<Void> setup(Database cx, ApiWorkload *self) {
|
||||
state Future<Void> disabler = disableConnectionFailuresAfter(300, "ApiWorkload");
|
||||
self->transactionFactory = Reference<TransactionFactoryInterface>(new TransactionFactory<FlowTransactionWrapper<Transaction>, const Database>(cx));
|
||||
self->transactionFactory = Reference<TransactionFactoryInterface>(new TransactionFactory<FlowTransactionWrapper<Transaction>, const Database>(cx, cx, false));
|
||||
|
||||
//Clear keyspace before running
|
||||
Void _ = wait(timeoutError(self->clearKeyspace(), 600));
|
||||
|
@ -270,25 +270,25 @@ ACTOR Future<Void> chooseTransactionFactory(Database cx, std::vector<Transaction
|
|||
|
||||
if(transactionType == NATIVE) {
|
||||
printf("client %d: Running NativeAPI Transactions\n", self->clientPrefixInt);
|
||||
self->transactionFactory = Reference<TransactionFactoryInterface>(new TransactionFactory<FlowTransactionWrapper<Transaction>, const Database>(cx));
|
||||
self->transactionFactory = Reference<TransactionFactoryInterface>(new TransactionFactory<FlowTransactionWrapper<Transaction>, const Database>(cx, self->extraDB, self->useExtraDB));
|
||||
}
|
||||
else if(transactionType == READ_YOUR_WRITES)
|
||||
{
|
||||
printf("client %d: Running ReadYourWrites Transactions\n", self->clientPrefixInt);
|
||||
self->transactionFactory = Reference<TransactionFactoryInterface>(new TransactionFactory<FlowTransactionWrapper<ReadYourWritesTransaction>, const Database>(cx));
|
||||
self->transactionFactory = Reference<TransactionFactoryInterface>(new TransactionFactory<FlowTransactionWrapper<ReadYourWritesTransaction>, const Database>(cx, self->extraDB, self->useExtraDB));
|
||||
}
|
||||
else if(transactionType == THREAD_SAFE)
|
||||
{
|
||||
printf("client %d: Running ThreadSafe Transactions\n", self->clientPrefixInt);
|
||||
Reference<IDatabase> dbHandle = wait(unsafeThreadFutureToFuture(ThreadSafeDatabase::createFromExistingDatabase(cx)));
|
||||
self->transactionFactory = Reference<TransactionFactoryInterface>(new TransactionFactory<ThreadTransactionWrapper, Reference<IDatabase>>(dbHandle));
|
||||
self->transactionFactory = Reference<TransactionFactoryInterface>(new TransactionFactory<ThreadTransactionWrapper, Reference<IDatabase>>(dbHandle, dbHandle, false));
|
||||
}
|
||||
else if(transactionType == MULTI_VERSION)
|
||||
{
|
||||
printf("client %d: Running Multi-Version Transactions\n", self->clientPrefixInt);
|
||||
Reference<IDatabase> threadSafeHandle = wait(unsafeThreadFutureToFuture(ThreadSafeDatabase::createFromExistingDatabase(cx)));
|
||||
Reference<IDatabase> dbHandle = MultiVersionDatabase::debugCreateFromExistingDatabase(threadSafeHandle);
|
||||
self->transactionFactory = Reference<TransactionFactoryInterface>(new TransactionFactory<ThreadTransactionWrapper, Reference<IDatabase>>(dbHandle));
|
||||
self->transactionFactory = Reference<TransactionFactoryInterface>(new TransactionFactory<ThreadTransactionWrapper, Reference<IDatabase>>(dbHandle, dbHandle, false));
|
||||
}
|
||||
|
||||
return Void();
|
||||
|
|
|
@ -85,9 +85,16 @@ struct TransactionWrapper : public ReferenceCounted<TransactionWrapper> {
|
|||
//A wrapper class for flow based transactions (NativeAPI, ReadYourWrites)
|
||||
template<class T>
|
||||
struct FlowTransactionWrapper : public TransactionWrapper {
|
||||
|
||||
Database cx;
|
||||
Database extraDB;
|
||||
bool useExtraDB;
|
||||
T transaction;
|
||||
FlowTransactionWrapper(Database cx) : transaction(cx) { }
|
||||
T lastTransaction;
|
||||
FlowTransactionWrapper(Database cx, Database extraDB, bool useExtraDB) : cx(cx), extraDB(extraDB), useExtraDB(useExtraDB), transaction(cx) {
|
||||
if(useExtraDB && g_random->random01() < 0.5) {
|
||||
transaction = T(extraDB);
|
||||
}
|
||||
}
|
||||
virtual ~FlowTransactionWrapper() { }
|
||||
|
||||
//Sets a key-value pair in the database
|
||||
|
@ -132,7 +139,12 @@ struct FlowTransactionWrapper : public TransactionWrapper {
|
|||
|
||||
//Processes transaction error conditions
|
||||
Future<Void> onError(Error const& e) {
|
||||
return transaction.onError(e);
|
||||
Future<Void> returnVal = transaction.onError(e);
|
||||
if( useExtraDB ) {
|
||||
lastTransaction = std::move(transaction);
|
||||
transaction = T( g_random->random01() < 0.5 ? extraDB : cx );
|
||||
}
|
||||
return returnVal;
|
||||
}
|
||||
|
||||
//Gets the read version of a transaction
|
||||
|
@ -160,7 +172,7 @@ struct ThreadTransactionWrapper : public TransactionWrapper {
|
|||
|
||||
Reference<ITransaction> transaction;
|
||||
|
||||
ThreadTransactionWrapper(Reference<IDatabase> db) : transaction(db->createTransaction()) { }
|
||||
ThreadTransactionWrapper(Reference<IDatabase> db, Reference<IDatabase> extraDB, bool useExtraDB) : transaction(db->createTransaction()) { }
|
||||
virtual ~ThreadTransactionWrapper() { }
|
||||
|
||||
//Sets a key-value pair in the database
|
||||
|
@ -237,17 +249,21 @@ struct TransactionFactory : public TransactionFactoryInterface {
|
|||
|
||||
//The database used to create transaction (of type Database, Reference<ThreadSafeDatabase>, etc.)
|
||||
DB dbHandle;
|
||||
DB extraDbHandle;
|
||||
bool useExtraDB;
|
||||
|
||||
TransactionFactory(DB dbHandle) : dbHandle(dbHandle) { }
|
||||
TransactionFactory(DB dbHandle, DB extraDbHandle, bool useExtraDB) : dbHandle(dbHandle), extraDbHandle(extraDbHandle), useExtraDB(useExtraDB) { }
|
||||
virtual ~TransactionFactory() { }
|
||||
|
||||
//Creates a new transaction
|
||||
Reference<TransactionWrapper> createTransaction() {
|
||||
return Reference<TransactionWrapper>(new T(dbHandle));
|
||||
return Reference<TransactionWrapper>(new T(dbHandle, extraDbHandle, useExtraDB));
|
||||
}
|
||||
};
|
||||
|
||||
struct ApiWorkload : TestWorkload {
|
||||
bool useExtraDB;
|
||||
Database extraDB;
|
||||
|
||||
ApiWorkload(WorkloadContext const& wcx, int maxClients = -1) : TestWorkload(wcx), success(true), transactionFactory(NULL), maxClients(maxClients) {
|
||||
clientPrefixInt = getOption(options, LiteralStringRef("clientId"), clientId);
|
||||
|
@ -262,6 +278,13 @@ struct ApiWorkload : TestWorkload {
|
|||
maxLongKeyLength = getOption(options, LiteralStringRef("maxLongKeyLength"), 128);
|
||||
minValueLength = getOption(options, LiteralStringRef("minValueLength"), 1);
|
||||
maxValueLength = getOption(options, LiteralStringRef("maxValueLength"), 10000);
|
||||
|
||||
useExtraDB = g_simulator.extraDB != NULL;
|
||||
if(useExtraDB) {
|
||||
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
|
||||
Reference<Cluster> extraCluster = Cluster::createCluster(extraFile, -1);
|
||||
extraDB = extraCluster->createDatabase(LiteralStringRef("DB")).get();
|
||||
}
|
||||
}
|
||||
|
||||
Future<Void> setup(Database const& cx);
|
||||
|
|
|
@ -54,15 +54,13 @@ struct ChangeConfigWorkload : TestWorkload {
|
|||
|
||||
virtual void getMetrics( vector<PerfMetric>& m ) {}
|
||||
|
||||
ACTOR Future<Void> ChangeConfigClient( Database cx, ChangeConfigWorkload *self) {
|
||||
state Future<Void> disabler = disableConnectionFailuresAfter(300, "ChangeConfig");
|
||||
Void _ = wait( delay( self->minDelayBeforeChange + g_random->random01() * ( self->maxDelayBeforeChange - self->minDelayBeforeChange ) ) );
|
||||
|
||||
ACTOR Future<Void> extraDatabaseConfigure(ChangeConfigWorkload *self) {
|
||||
if (g_network->isSimulated() && g_simulator.extraDB) {
|
||||
Reference<ClusterConnectionFile> extraFile(new ClusterConnectionFile(*g_simulator.extraDB));
|
||||
Reference<Cluster> cluster = Cluster::createCluster(extraFile, -1);
|
||||
state Database extraDB = cluster->createDatabase(LiteralStringRef("DB")).get();
|
||||
|
||||
Void _ = wait(delay(5*g_random->random01()));
|
||||
if (self->configMode.size())
|
||||
ConfigurationResult::Type _ = wait(changeConfig(extraDB, self->configMode));
|
||||
if (self->networkAddresses.size()) {
|
||||
|
@ -71,6 +69,19 @@ struct ChangeConfigWorkload : TestWorkload {
|
|||
else
|
||||
CoordinatorsResult::Type _ = wait(changeQuorum(extraDB, specifiedQuorumChange(NetworkAddress::parseList(self->networkAddresses))));
|
||||
}
|
||||
Void _ = wait(delay(5*g_random->random01()));
|
||||
}
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> ChangeConfigClient( Database cx, ChangeConfigWorkload *self) {
|
||||
state Future<Void> disabler = disableConnectionFailuresAfter(300, "ChangeConfig");
|
||||
Void _ = wait( delay( self->minDelayBeforeChange + g_random->random01() * ( self->maxDelayBeforeChange - self->minDelayBeforeChange ) ) );
|
||||
|
||||
state bool extraConfigureBefore = g_random->random01() < 0.5;
|
||||
|
||||
if(extraConfigureBefore) {
|
||||
Void _ = wait( self->extraDatabaseConfigure(self) );
|
||||
}
|
||||
|
||||
if( self->configMode.size() )
|
||||
|
@ -81,6 +92,11 @@ struct ChangeConfigWorkload : TestWorkload {
|
|||
else
|
||||
CoordinatorsResult::Type _ = wait( changeQuorum( cx, specifiedQuorumChange(NetworkAddress::parseList( self->networkAddresses )) ) );
|
||||
}
|
||||
|
||||
if(!extraConfigureBefore) {
|
||||
Void _ = wait( self->extraDatabaseConfigure(self) );
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
};
|
||||
|
|
|
@ -808,6 +808,11 @@ struct WriteDuringReadWorkload : TestWorkload {
|
|||
self->changeCount.insert( allKeys, 0 );
|
||||
doingCommit = false;
|
||||
//TraceEvent("WDRError").error(e, true);
|
||||
if(e.code() == error_code_database_locked) {
|
||||
self->memoryDatabase = self->lastCommittedDatabase;
|
||||
self->addedConflicts.insert(allKeys, false);
|
||||
return Void();
|
||||
}
|
||||
if( e.code() == error_code_not_committed || e.code() == error_code_commit_unknown_result || e.code() == error_code_transaction_too_large || e.code() == error_code_key_too_large || e.code() == error_code_value_too_large || cancelled )
|
||||
throw not_committed();
|
||||
try {
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
testTitle=ApiCorrectnessTest
|
||||
testName=ApiCorrectness
|
||||
runSetup=true
|
||||
clearAfterTest=true
|
||||
numKeys=5000
|
||||
onlyLowerCase=true
|
||||
shortKeysRatio=0.5
|
||||
minShortKeyLength=1
|
||||
maxShortKeyLength=3
|
||||
minLongKeyLength=1
|
||||
maxLongKeyLength=128
|
||||
minValueLength=1
|
||||
maxValueLength=1000
|
||||
numGets=1000
|
||||
numGetRanges=100
|
||||
numGetRangeSelectors=100
|
||||
numGetKeys=100
|
||||
numClears=100
|
||||
numClearRanges=10
|
||||
maxTransactionBytes=500000
|
||||
randomTestDuration=60
|
||||
timeout=2100
|
||||
|
||||
testName=AtomicSwitchover
|
||||
switch1After=10.0
|
||||
switch2After=20.0
|
||||
stopAfter=130.0
|
||||
clearAfterTest=false
|
||||
simBackupAgents=BackupToDB
|
||||
extraDB=2
|
Loading…
Reference in New Issue