Added the ability to disable all commits which do not modify the system keys by setting \xff/onlySystem = 1 in the database

This commit is contained in:
Evan Tschannen 2018-08-21 21:09:50 -07:00
parent a694364a39
commit cb60002944
4 changed files with 26 additions and 1 deletions

View File

@ -558,3 +558,4 @@ std::pair<MetricNameRef, KeyRef> decodeMetricConfKey( KeyRef const& prefix, KeyR
const KeyRef maxUIDKey = LiteralStringRef("\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff");
const KeyRef databaseLockedKey = LiteralStringRef("\xff/dbLocked");
const KeyRef onlySystemTransactionsKey = LiteralStringRef("\xff/onlySystem");

View File

@ -250,5 +250,6 @@ extern const KeyRef metricConfPrefix;
extern const KeyRef maxUIDKey;
extern const KeyRef databaseLockedKey;
extern const KeyRef onlySystemTransactionsKey;
#endif

View File

@ -178,7 +178,7 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<Mut
}
}
}
} else if( m.param1 == databaseLockedKey || m.param1.startsWith(applyMutationsBeginRange.begin) ||
} else if( m.param1 == databaseLockedKey || m.param1 == onlySystemTransactionsKey || m.param1.startsWith(applyMutationsBeginRange.begin) ||
m.param1.startsWith(applyMutationsAddPrefixRange.begin) || m.param1.startsWith(applyMutationsRemovePrefixRange.begin) || m.param1.startsWith(tagLocalityListPrefix) || m.param1.startsWith(serverTagHistoryPrefix) ) {
if(!initialCommit) txnStateStore->set(KeyValueRef(m.param1, m.param2));
}
@ -333,6 +333,9 @@ static void applyMetadataMutations(UID const& dbgid, Arena &arena, VectorRef<Mut
if (range.contains(databaseLockedKey)) {
if(!initialCommit) txnStateStore->clear(singleKeyRange(databaseLockedKey));
}
if (range.contains(onlySystemTransactionsKey)) {
if(!initialCommit) txnStateStore->clear(singleKeyRange(onlySystemTransactionsKey));
}
if(range.intersects(applyMutationsEndRange)) {
KeyRangeRef commonEndRange(range & applyMutationsEndRange);
if(!initialCommit) txnStateStore->clear(commonEndRange);

View File

@ -506,6 +506,26 @@ ACTOR Future<Void> commitBatch(
state Optional<Key> lockedKey = self->txnStateStore->readValue(databaseLockedKey).get();
state bool locked = lockedKey.present() && lockedKey.get().size();
state Optional<Key> onlySystemKey = self->txnStateStore->readValue(onlySystemTransactionsKey).get();
state bool onlySystem = onlySystemKey.present() && onlySystemKey.get().size();
if(onlySystem) {
for (int t = 0; t<trs.size(); t++) {
if( committed[t] == ConflictBatch::TransactionCommitted ) {
bool foundSystem = false;
for(auto& m : trs[t].transaction.mutations) {
if( ( m.type == MutationRef::ClearRange ? m.param2 : m.param1 ) >= nonMetadataSystemKeys.end) {
foundSystem = true;
break;
}
}
if(!foundSystem) {
committed[t] = ConflictBatch::TransactionConflict;
}
}
}
}
if(forceRecovery) {
Void _ = wait( Future<Void>(Never()) );
}