wait for prev
This commit is contained in:
parent
4676dacaab
commit
7c285aa4cf
|
@ -879,8 +879,9 @@ ACTOR Future<Void> applyMetadataToCommittedTransactions(CommitBatchContext* self
|
|||
// Message the sequencer to obtain the previous commit version for each storage server's tag
|
||||
ACTOR Future<Void> getTPCV(CommitBatchContext* self) {
|
||||
state ProxyCommitData* const pProxyCommitData = self->pProxyCommitData;
|
||||
GetTLogPrevCommitVersionReply rep = wait(brokenPromiseToNever(
|
||||
pProxyCommitData->master.getTLogPrevCommitVersion.getReply(GetTLogPrevCommitVersionRequest(self->writtenTLogs))));
|
||||
GetTLogPrevCommitVersionReply rep =
|
||||
wait(brokenPromiseToNever(pProxyCommitData->master.getTLogPrevCommitVersion.getReply(
|
||||
GetTLogPrevCommitVersionRequest(self->writtenTLogs))));
|
||||
// TraceEvent("GetTLogPrevCommitVersionRequest");
|
||||
return Void();
|
||||
}
|
||||
|
@ -1189,9 +1190,12 @@ ACTOR Future<Void> postResolution(CommitBatchContext* self) {
|
|||
if (self->prevVersion && self->commitVersion - self->prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT / 2)
|
||||
debug_advanceMaxCommittedVersion(UID(), self->commitVersion); //< Is this valid?
|
||||
|
||||
//TraceEvent("ProxyPush", pProxyCommitData->dbgid).detail("PrevVersion", prevVersion).detail("Version", commitVersion)
|
||||
// .detail("TransactionsSubmitted", trs.size()).detail("TransactionsCommitted", commitCount).detail("TxsPopTo",
|
||||
// msg.popTo);
|
||||
// TraceEvent("ProxyPush", pProxyCommitData->dbgid)
|
||||
// .detail("PrevVersion", self->prevVersion)
|
||||
// .detail("Version", self->commitVersion)
|
||||
// .detail("TransactionsSubmitted", trs.size())
|
||||
// .detail("TransactionsCommitted", self->commitCount)
|
||||
// .detail("TxsPopTo", self->msg.popTo);
|
||||
|
||||
if (self->prevVersion && self->commitVersion - self->prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT / 2)
|
||||
debug_advanceMaxCommittedVersion(UID(), self->commitVersion);
|
||||
|
@ -1281,7 +1285,9 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
|
|||
if (self->prevVersion && self->commitVersion - self->prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT / 2)
|
||||
debug_advanceMinCommittedVersion(UID(), self->commitVersion);
|
||||
|
||||
//TraceEvent("ProxyPushed", pProxyCommitData->dbgid).detail("PrevVersion", prevVersion).detail("Version", commitVersion);
|
||||
// TraceEvent("ProxyPushed", pProxyCommitData->dbgid)
|
||||
// .detail("PrevVersion", self->prevVersion)
|
||||
// .detail("Version", self->commitVersion);
|
||||
if (debugID.present())
|
||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "CommitProxyServer.commitBatch.AfterLogPush");
|
||||
|
||||
|
@ -1307,6 +1313,7 @@ ACTOR Future<Void> reply(CommitBatchContext* self) {
|
|||
self->lockedAfter,
|
||||
self->metadataVersionAfter,
|
||||
pProxyCommitData->minKnownCommittedVersion,
|
||||
self->prevVersion,
|
||||
writtenTags),
|
||||
TaskPriority::ProxyMasterVersionReply));
|
||||
}
|
||||
|
|
|
@ -217,6 +217,7 @@ struct ReportRawCommittedVersionRequest {
|
|||
bool locked;
|
||||
Optional<Value> metadataVersion;
|
||||
Version minKnownCommittedVersion;
|
||||
Optional<Version> prevVersion; // if present, wait for prevVersion to be committed before replying
|
||||
Optional<std::set<Tag>> writtenTags;
|
||||
ReplyPromise<Void> reply;
|
||||
|
||||
|
@ -225,13 +226,14 @@ struct ReportRawCommittedVersionRequest {
|
|||
bool locked,
|
||||
Optional<Value> metadataVersion,
|
||||
Version minKnownCommittedVersion,
|
||||
Optional<Version> prevVersion,
|
||||
Optional<std::set<Tag>> writtenTags = Optional<std::set<Tag>>())
|
||||
: version(version), locked(locked), metadataVersion(metadataVersion),
|
||||
minKnownCommittedVersion(minKnownCommittedVersion), writtenTags(writtenTags) {}
|
||||
minKnownCommittedVersion(minKnownCommittedVersion), writtenTags(writtenTags), prevVersion(prevVersion) {}
|
||||
|
||||
template <class Ar>
|
||||
void serialize(Ar& ar) {
|
||||
serializer(ar, version, locked, metadataVersion, minKnownCommittedVersion, writtenTags, reply);
|
||||
serializer(ar, version, locked, metadataVersion, minKnownCommittedVersion, prevVersion, writtenTags, reply);
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -183,7 +183,7 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
|
|||
recoveryTransactionVersion; // The first version in this epoch
|
||||
double lastCommitTime;
|
||||
|
||||
Version liveCommittedVersion; // The largest live committed version reported by commit proxies.
|
||||
NotifiedVersion liveCommittedVersion; // The largest live committed version reported by commit proxies.
|
||||
bool databaseLocked;
|
||||
Optional<Value> proxyMetadataVersion;
|
||||
Version minKnownCommittedVersion;
|
||||
|
@ -1221,6 +1221,31 @@ ACTOR Future<Void> provideVersions(Reference<MasterData> self) {
|
|||
}
|
||||
}
|
||||
|
||||
void updateLiveCommittedVersion(Reference<MasterData> self, ReportRawCommittedVersionRequest req) {
|
||||
self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, req.minKnownCommittedVersion);
|
||||
if (req.version > self->liveCommittedVersion.get()) {
|
||||
self->databaseLocked = req.locked;
|
||||
self->proxyMetadataVersion = req.metadataVersion;
|
||||
// Note the set call switches context to any waiters on liveCommittedVersion before continuing.
|
||||
self->liveCommittedVersion.set(req.version);
|
||||
}
|
||||
++self->reportLiveCommittedVersionRequests;
|
||||
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && req.writtenTags.present()) {
|
||||
// NB: this if-condition is not needed after wait-for-prev is ported to this branch
|
||||
if (req.version > self->ssVersionVector.maxVersion) {
|
||||
// TraceEvent("Received ReportRawCommittedVersionRequest").detail("Version",req.version);
|
||||
self->ssVersionVector.setVersions(req.writtenTags.get(), req.version);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> waitForPrev(Reference<MasterData> self, ReportRawCommittedVersionRequest req) {
|
||||
wait(self->liveCommittedVersion.whenAtLeast(req.prevVersion.get()));
|
||||
updateLiveCommittedVersion(self, req);
|
||||
req.reply.send(Void());
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> serveLiveCommittedVersion(Reference<MasterData> self) {
|
||||
loop {
|
||||
choose {
|
||||
|
@ -1230,12 +1255,12 @@ ACTOR Future<Void> serveLiveCommittedVersion(Reference<MasterData> self) {
|
|||
req.debugID.get().first(),
|
||||
"MasterServer.serveLiveCommittedVersion.GetRawCommittedVersion");
|
||||
|
||||
if (self->liveCommittedVersion == invalidVersion) {
|
||||
self->liveCommittedVersion = self->recoveryTransactionVersion;
|
||||
if (self->liveCommittedVersion.get() == invalidVersion) {
|
||||
self->liveCommittedVersion.set(self->recoveryTransactionVersion);
|
||||
}
|
||||
++self->getLiveCommittedVersionRequests;
|
||||
GetRawCommittedVersionReply reply;
|
||||
reply.version = self->liveCommittedVersion;
|
||||
reply.version = self->liveCommittedVersion.get();
|
||||
reply.locked = self->databaseLocked;
|
||||
reply.metadataVersion = self->proxyMetadataVersion;
|
||||
reply.minKnownCommittedVersion = self->minKnownCommittedVersion;
|
||||
|
@ -1244,21 +1269,14 @@ ACTOR Future<Void> serveLiveCommittedVersion(Reference<MasterData> self) {
|
|||
}
|
||||
when(ReportRawCommittedVersionRequest req =
|
||||
waitNext(self->myInterface.reportLiveCommittedVersion.getFuture())) {
|
||||
self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, req.minKnownCommittedVersion);
|
||||
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && req.writtenTags.present()) {
|
||||
// NB: this if-condition is not needed after wait-for-prev is ported to this branch
|
||||
if (req.version > self->ssVersionVector.maxVersion) {
|
||||
// TraceEvent("Received ReportRawCommittedVersionRequest").detail("Version",req.version);
|
||||
self->ssVersionVector.setVersions(req.writtenTags.get(), req.version);
|
||||
}
|
||||
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR && req.prevVersion.present() &&
|
||||
(self->liveCommittedVersion.get() != invalidVersion) &&
|
||||
(self->liveCommittedVersion.get() < req.prevVersion.get())) {
|
||||
self->addActor.send(waitForPrev(self, req));
|
||||
} else {
|
||||
updateLiveCommittedVersion(self, req);
|
||||
req.reply.send(Void());
|
||||
}
|
||||
if (req.version > self->liveCommittedVersion) {
|
||||
self->liveCommittedVersion = req.version;
|
||||
self->databaseLocked = req.locked;
|
||||
self->proxyMetadataVersion = req.metadataVersion;
|
||||
}
|
||||
++self->reportLiveCommittedVersionRequests;
|
||||
req.reply.send(Void());
|
||||
}
|
||||
when(GetTLogPrevCommitVersionRequest req =
|
||||
waitNext(self->myInterface.getTLogPrevCommitVersion.getFuture())) {
|
||||
|
|
Loading…
Reference in New Issue