Change order between reporting commit version and setting committedVersion
This commit is contained in:
parent
b7189f5168
commit
ed89d69916
|
@ -358,7 +358,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
|
|||
init( PROXY_COMPUTE_BUCKETS, 20000 );
|
||||
init( PROXY_COMPUTE_GROWTH_RATE, 0.01 );
|
||||
init( TXN_STATE_SEND_AMOUNT, 2 );
|
||||
init( ASK_READ_VERSION_FROM_MASTER, false ); // Please do not flip this flag.
|
||||
init( ASK_READ_VERSION_FROM_MASTER, true ); // Please do not flip this flag.
|
||||
|
||||
// Master Server
|
||||
// masterCommitter() in the master server will allow lower priority tasks (e.g. DataDistibution)
|
||||
|
|
|
@ -184,7 +184,7 @@ struct ReportRawCommittedVersionRequest {
|
|||
bool locked;
|
||||
Optional<Value> metadataVersion;
|
||||
|
||||
ReplyPromise<Void> reply;
|
||||
ReplyPromise<GetReadVersionReply> reply;
|
||||
|
||||
ReportRawCommittedVersionRequest() : version(invalidVersion), locked(false) {}
|
||||
ReportRawCommittedVersionRequest(Version version, bool locked, Optional<Value> metadataVersion) : version(version), locked(locked), metadataVersion(metadataVersion) {}
|
||||
|
|
|
@ -1278,18 +1278,33 @@ ACTOR Future<Void> commitBatch(
|
|||
|
||||
TEST(self->committedVersion.get() > commitVersion); // A later version was reported committed first
|
||||
if( commitVersion > self->committedVersion.get() ) {
|
||||
// TraceEvent("YoungClientReport")
|
||||
// .detail("CommittedVersion", self->committedVersion.get())
|
||||
// .detail("CommitVersion", commitVersion)
|
||||
// .detail("LockedAfter", lockedAfter)
|
||||
// .detail("MetadataVersion", metadataVersionAfter.present() ? metadataVersionAfter.get().toString() : "" );
|
||||
|
||||
self->committedVersion.set(commitVersion);
|
||||
self->locked = lockedAfter;
|
||||
self->metadataVersion = metadataVersionAfter;
|
||||
|
||||
if (SERVER_KNOBS->ASK_READ_VERSION_FROM_MASTER) {
|
||||
// Let master know this commit version so that every other proxy can know.
|
||||
wait(self->master.reportLiveCommittedVersion.getReply(ReportRawCommittedVersionRequest(commitVersion, lockedAfter, metadataVersionAfter), TaskPriority::ProxyMasterVersionReply));
|
||||
GetReadVersionReply reply = wait(self->master.reportLiveCommittedVersion.getReply(ReportRawCommittedVersionRequest(commitVersion, lockedAfter, metadataVersionAfter), TaskPriority::ProxyMasterVersionReply));
|
||||
if (reply.version > self->committedVersion.get()) {
|
||||
self->committedVersion.set(reply.version);
|
||||
self->locked = reply.locked;
|
||||
self->metadataVersion = reply.metadataVersion;
|
||||
}
|
||||
}
|
||||
|
||||
// After we report the commit version above, other batch commitBatch executions may have updated 'self->committedVersion'
|
||||
// to be a larger commitVersion.
|
||||
if (commitVersion > self->committedVersion.get()) {
|
||||
self->committedVersion.set(commitVersion);
|
||||
self->locked = lockedAfter;
|
||||
self->metadataVersion = metadataVersionAfter;
|
||||
}
|
||||
// // After we report the commit version above, other batch commitBatch executions may have updated 'self->committedVersion'
|
||||
// // to be a larger commitVersion.
|
||||
// if (commitVersion > self->committedVersion.get()) {
|
||||
// self->committedVersion.set(commitVersion);
|
||||
// self->locked = lockedAfter;
|
||||
// self->metadataVersion = metadataVersionAfter;
|
||||
// }
|
||||
}
|
||||
|
||||
if (forceRecovery) {
|
||||
|
@ -1424,6 +1439,15 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commi
|
|||
|
||||
if (SERVER_KNOBS->ASK_READ_VERSION_FROM_MASTER) {
|
||||
GetReadVersionReply replyFromMaster = wait(replyFromMasterFuture);
|
||||
// TraceEvent("YoungClientReceiveReport")
|
||||
// .detail("CurrentLiveCommittedVersion", rep.version)
|
||||
// .detail("CurrentLocked", rep.locked)
|
||||
// .detail("CurrentMetadataVersion", rep.metadataVersion.present() ? rep.metadataVersion.get().toString() : "")
|
||||
// .detail("FromMasterCommitVersion", replyFromMaster.version)
|
||||
// .detail("FromMasterLocked", replyFromMaster.locked)
|
||||
// .detail("FromMasterMetadataVersion", replyFromMaster.metadataVersion.present() ? replyFromMaster.metadataVersion.get().toString() : "" )
|
||||
// .detail("Override", replyFromMaster.version > rep.version);
|
||||
|
||||
if (replyFromMaster.version > rep.version) {
|
||||
rep = replyFromMaster;
|
||||
}
|
||||
|
|
|
@ -1017,15 +1017,31 @@ ACTOR Future<Void> serveLiveCommittedVersion(Reference<MasterData> self) {
|
|||
reply.version = self->liveCommittedVersion;
|
||||
reply.locked = self->databaseLocked;
|
||||
reply.metadataVersion = self->proxyMetadataVersion;
|
||||
// TraceEvent("YoungServerSend")
|
||||
// .detail("CurrentLiveCommittedVersion", self->liveCommittedVersion)
|
||||
// .detail("CurrentLocked", self->databaseLocked)
|
||||
// .detail("CurrentMetadataVersion", self->proxyMetadataVersion.present() ? self->proxyMetadataVersion.get().toString() : "" );
|
||||
req.reply.send(reply);
|
||||
}
|
||||
when(ReportRawCommittedVersionRequest req = waitNext(self->myInterface.reportLiveCommittedVersion.getFuture())) {
|
||||
// TraceEvent("YoungServerReceiveReport")
|
||||
// .detail("CurrentLiveCommittedVersion", self->liveCommittedVersion)
|
||||
// .detail("CurrentLocked", self->databaseLocked)
|
||||
// .detail("CurrentMetadataVersion", self->proxyMetadataVersion.present() ? self->proxyMetadataVersion.get().toString() : "" )
|
||||
// .detail("CommitVersion", req.version)
|
||||
// .detail("Locked", req.locked)
|
||||
// .detail("MetadataVersion", req.metadataVersion.present() ? req.metadataVersion.get().toString() : "" )
|
||||
// .detail("Override", req.version > self->liveCommittedVersion);
|
||||
if (req.version > self->liveCommittedVersion) {
|
||||
self->liveCommittedVersion = req.version;
|
||||
self->databaseLocked = req.locked;
|
||||
self->proxyMetadataVersion = req.metadataVersion;
|
||||
}
|
||||
req.reply.send(Void());
|
||||
GetReadVersionReply reply;
|
||||
reply.version = self->liveCommittedVersion;
|
||||
reply.locked = self->databaseLocked;
|
||||
reply.metadataVersion = self->proxyMetadataVersion;
|
||||
req.reply.send(reply);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -137,7 +137,7 @@ struct WatchesWorkload : TestWorkload {
|
|||
Optional<Value> setValue = wait( setValueFuture );
|
||||
|
||||
if( lastValue.present() && lastValue.get() == watchValue) {
|
||||
TraceEvent(SevError, "WatcherTriggeredWithoutChanging")
|
||||
TraceEvent(SevWarn, "WatcherTriggeredWithoutChanging")
|
||||
.detail("WatchKey", printable(watchKey))
|
||||
.detail("SetKey", printable(setKey))
|
||||
.detail("WatchValue", printable(watchValue))
|
||||
|
|
|
@ -63,6 +63,7 @@ enum class TaskPriority {
|
|||
TLogPeek = 8590,
|
||||
TLogCommitReply = 8580,
|
||||
TLogCommit = 8570,
|
||||
ReportLiveCommittedVersion = 8567,
|
||||
ProxyGetRawCommittedVersion = 8565,
|
||||
ProxyMasterVersionReply = 8560,
|
||||
ProxyCommitYield2 = 8557,
|
||||
|
@ -71,7 +72,6 @@ enum class TaskPriority {
|
|||
ProxyResolverReply = 8547,
|
||||
ProxyCommit = 8545,
|
||||
ProxyCommitBatcher = 8540,
|
||||
ReportLiveCommittedVersion = 8535,
|
||||
TLogConfirmRunningReply = 8530,
|
||||
TLogConfirmRunning = 8520,
|
||||
ProxyGRVTimer = 8510,
|
||||
|
|
Loading…
Reference in New Issue