changing a storage server’s tag must be the first mutations applied in a version, because privatized mutations applied earlier in the same version will use the old tag

This commit is contained in:
Evan Tschannen 2018-02-09 18:21:29 -08:00
parent c7b3be5b19
commit fbadcc6eea
9 changed files with 47 additions and 14 deletions

View File

@ -610,7 +610,7 @@ ACTOR Future<int> dumpData(Database cx, PromiseStream<RCGroup> results, Referenc
// choose, it's impossible for us to get a transaction_too_old error back, and it's impossible
// for our transaction to be aborted due to conflicts.
req.transaction.read_snapshot = committedVersion->get();
req.isLockAware = true;
req.flags = req.flags | CommitTransactionRequest::FLAG_IS_LOCK_AWARE;
totalBytes += mutationSize;
Void _ = wait( commitLock->take(TaskDefaultYield, mutationSize) );
@ -651,7 +651,7 @@ ACTOR Future<Void> coalesceKeyVersionCache(Key uid, Version endVersion, Referenc
req.transaction.write_conflict_ranges.push_back_deep(req.arena, singleKeyRange(countKey));
req.transaction.mutations.push_back_deep(req.arena, MutationRef(MutationRef::AddValue, countKey, StringRef((uint8_t*)&removed, 8)));
req.transaction.read_snapshot = committedVersion->get();
req.isLockAware = true;
req.flags = req.flags | CommitTransactionRequest::FLAG_IS_LOCK_AWARE;
Void _ = wait( commitLock->take(TaskDefaultYield, mutationSize) );
addActor.send( commitLock->releaseWhen( success(commit.getReply(req)), mutationSize ) );

View File

@ -74,15 +74,25 @@ struct CommitID {
};
struct CommitTransactionRequest {
enum {
FLAG_IS_LOCK_AWARE = 0x1,
FLAG_FIRST_IN_BATCH = 0x2
};
bool isLockAware() const { return flags & FLAG_IS_LOCK_AWARE; }
bool firstInBatch() const { return flags & FLAG_FIRST_IN_BATCH; }
Arena arena;
CommitTransactionRef transaction;
ReplyPromise<CommitID> reply;
ReplyPromise<CommitID> reply;
uint32_t flags;
Optional<UID> debugID;
bool isLockAware;
CommitTransactionRequest() : flags(0) {}
template <class Ar>
void serialize(Ar& ar) {
ar & transaction & reply & arena & debugID & isLockAware;
ar & transaction & reply & arena & flags & debugID;
}
};

View File

@ -2514,7 +2514,12 @@ Future<Void> Transaction::commitMutations() {
TraceEvent("TransactionMutation", u).detail("T", i->type).detail("P1", printable(i->param1)).detail("P2", printable(i->param2));
}
tr.isLockAware = options.lockAware;
if(options.lockAware) {
tr.flags = tr.flags | CommitTransactionRequest::FLAG_IS_LOCK_AWARE;
}
if(options.firstInBatch) {
tr.flags = tr.flags | CommitTransactionRequest::FLAG_FIRST_IN_BATCH;
}
Future<Void> commitResult = tryCommit( cx, trLogInfo, tr, readVersion, info, &this->committedVersion, this, options );
@ -2655,6 +2660,11 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
}
break;
case FDBTransactionOptions::FIRST_IN_BATCH:
validateOptionValue(value, false);
options.firstInBatch = true;
break;
default:
break;
}

View File

@ -150,6 +150,7 @@ struct TransactionOptions {
bool debugDump : 1;
bool lockAware : 1;
bool readOnly : 1;
bool firstInBatch : 1;
TransactionOptions() {
reset();

View File

@ -174,6 +174,8 @@ description is not currently required but encouraged.
description="By default, operations that are performed on a transaction while it is being committed will not only fail themselves, but they will attempt to fail other in-flight operations (such as the commit) as well. This behavior is intended to help developers discover situations where operations could be unintentionally executed after the transaction has been reset. Setting this option removes that protection, causing only the offending operation to fail."/>
<Option name="read_lock_aware" code="702"
description="The transaction can read from locked databases."/>
<Option name="first_in_batch" code="710"
description="No other transactions will be applied before this transaction within the same commit version."/>
</Scope>
<!-- The enumeration values matter - do not change them without

View File

@ -39,6 +39,13 @@ void logOnReceive(CommitTransactionRequest x) {
g_traceBatch.addEvent("CommitDebug", x.debugID.get().first(), "MasterProxyServer.batcher");
}
template <class X>
bool firstInBatch(X x) { return false; }
bool firstInBatch(CommitTransactionRequest x) {
return x.firstInBatch();
}
ACTOR template <class X>
Future<Void> batcher(PromiseStream<std::vector<X>> out, FutureStream<X> in, double avgMinDelay, double* avgMaxDelay, double emptyBatchTimeout, int maxCount, int desiredBytes, int maxBytes, Optional<PromiseStream<Void>> batchStartedStream, int taskID = TaskDefaultDelay, Counter* counter = 0)
{
@ -73,7 +80,8 @@ Future<Void> batcher(PromiseStream<std::vector<X>> out, FutureStream<X> in, doub
}
int bytes = getBytes( x );
if(batchBytes + bytes > maxBytes && batch.size()) {
bool first = firstInBatch( x );
if((batchBytes + bytes > maxBytes || first) && batch.size()) {
out.send(batch);
lastBatch = now();
if(batchStartedStream.present())

View File

@ -506,7 +506,7 @@ ACTOR Future<Void> commitBatch(
state int commitCount = 0;
for (t = 0; t < trs.size() && !forceRecovery; t++)
{
if (committed[t] == ConflictBatch::TransactionCommitted && (!locked || trs[t].isLockAware)) {
if (committed[t] == ConflictBatch::TransactionCommitted && (!locked || trs[t].isLockAware())) {
commitCount++;
applyMetadataMutations(self->dbgid, arena, trs[t].transaction.mutations, self->txnStateStore, &toCommit, &forceRecovery, self->logSystem, commitVersion+1, &self->vecBackupKeys, &self->keyInfo, self->firstProxy ? &self->uid_applyMutationsData : NULL, self->commit, self->cx, &self->committedVersion, &self->storageCache);
}
@ -545,7 +545,7 @@ ACTOR Future<Void> commitBatch(
for (int t = 0; t<trs.size(); t++) {
if (committed[t] == ConflictBatch::TransactionCommitted && (!locked || trs[t].isLockAware)) {
if (committed[t] == ConflictBatch::TransactionCommitted && (!locked || trs[t].isLockAware())) {
for (auto m : trs[t].transaction.mutations) {
mutationCount++;
@ -829,7 +829,7 @@ ACTOR Future<Void> commitBatch(
// Send replies to clients
for (int t = 0; t < trs.size(); t++)
{
if (committed[t] == ConflictBatch::TransactionCommitted && (!locked || trs[t].isLockAware)) {
if (committed[t] == ConflictBatch::TransactionCommitted && (!locked || trs[t].isLockAware())) {
ASSERT_WE_THINK(commitVersion != invalidVersion);
trs[t].reply.send(CommitID(commitVersion, t));
}

View File

@ -1139,7 +1139,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
state bool debugResult = debug_checkMinRestoredVersion( UID(), self->lastEpochEnd, "DBRecovery", SevWarn );
CommitTransactionRequest recoveryCommitRequest;
recoveryCommitRequest.isLockAware = true;
recoveryCommitRequest.flags = recoveryCommitRequest.flags | CommitTransactionRequest::FLAG_IS_LOCK_AWARE;
CommitTransactionRef &tr = recoveryCommitRequest.transaction;
int mmApplied = 0; // The number of mutations in tr.mutations that have been applied to the txnStateStore so far
if (self->lastEpochEnd != 0) {

View File

@ -3281,8 +3281,9 @@ ACTOR Future<Void> replaceInterface( StorageServer* self, StorageServerInterface
KeyRange conflictRange = singleKeyRange(serverTagConflictKeyFor(rep.newTag.get()));
tr.addReadConflictRange( conflictRange );
tr.addWriteConflictRange( conflictRange );
tr.set(serverTagKeyFor(ssi.id()), serverTagValue(rep.newTag.get()));
tr.atomicOp(serverTagHistoryKeyFor(ssi.id()), serverTagValue(rep.tag), MutationRef::SetVersionstampedKey);
tr.setOption(FDBTransactionOptions::FIRST_IN_BATCH);
tr.set( serverTagKeyFor(ssi.id()), serverTagValue(rep.newTag.get()) );
tr.atomicOp( serverTagHistoryKeyFor(ssi.id()), serverTagValue(rep.tag), MutationRef::SetVersionstampedKey );
tr.atomicOp( serverMaxTagKeyFor(rep.newTag.get().locality), serverTagMaxValue(rep.newTag.get()), MutationRef::Max );
}
@ -3300,10 +3301,11 @@ ACTOR Future<Void> replaceInterface( StorageServer* self, StorageServerInterface
self->tag = rep.tag;
}
for(auto it : self->history) {
TraceEvent("SSHistory", self->thisServerID).detail("ver", it.first).detail("tag", it.second.toString());
TraceEvent("SSHistory", self->thisServerID).detail("ver", it.first).detail("tag", it.second.toString()).detail("myTag", self->tag.toString());
}
if(self->history.size() && BUGGIFY) {
TraceEvent("SSHistoryReboot", self->thisServerID);
throw please_reboot();
}