Merge pull request #3307 from yliucode/read-version

Serve GetReadVersion through master
This commit is contained in:
Evan Tschannen 2020-06-23 15:10:39 -07:00 committed by GitHub
commit 01030672a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 99 additions and 13 deletions

View File

@ -358,6 +358,7 @@ void ServerKnobs::initialize(bool randomize, ClientKnobs* clientKnobs, bool isSi
init( PROXY_COMPUTE_BUCKETS, 20000 );
init( PROXY_COMPUTE_GROWTH_RATE, 0.01 );
init( TXN_STATE_SEND_AMOUNT, 2 );
init( ASK_READ_VERSION_FROM_MASTER, true );
// Master Server
// masterCommitter() in the master server will allow lower priority tasks (e.g. DataDistibution)

View File

@ -288,6 +288,7 @@ public:
int PROXY_COMPUTE_BUCKETS;
double PROXY_COMPUTE_GROWTH_RATE;
int TXN_STATE_SEND_AMOUNT;
bool ASK_READ_VERSION_FROM_MASTER;
// Master Server
double COMMIT_SLEEP_TIME;

View File

@ -38,6 +38,10 @@ struct MasterInterface {
RequestStream< struct ChangeCoordinatorsRequest > changeCoordinators;
RequestStream< struct GetCommitVersionRequest > getCommitVersion;
RequestStream<struct BackupWorkerDoneRequest> notifyBackupWorkerDone;
// Get the centralized live committed version reported by proxies.
RequestStream< struct GetRawCommittedVersionRequest > getLiveCommittedVersion;
// Report a proxy's committed version.
RequestStream< struct ReportRawCommittedVersionRequest> reportLiveCommittedVersion;
NetworkAddress address() const { return changeCoordinators.getEndpoint().getPrimaryAddress(); }
NetworkAddressList addresses() const { return changeCoordinators.getEndpoint().addresses; }
@ -54,6 +58,8 @@ struct MasterInterface {
changeCoordinators = RequestStream< struct ChangeCoordinatorsRequest >( waitFailure.getEndpoint().getAdjustedEndpoint(2) );
getCommitVersion = RequestStream< struct GetCommitVersionRequest >( waitFailure.getEndpoint().getAdjustedEndpoint(3) );
notifyBackupWorkerDone = RequestStream<struct BackupWorkerDoneRequest>( waitFailure.getEndpoint().getAdjustedEndpoint(4) );
getLiveCommittedVersion = RequestStream< struct GetRawCommittedVersionRequest >( waitFailure.getEndpoint().getAdjustedEndpoint(5) );
reportLiveCommittedVersion = RequestStream< struct ReportRawCommittedVersionRequest>( waitFailure.getEndpoint().getAdjustedEndpoint(6) );
}
}
@ -64,6 +70,8 @@ struct MasterInterface {
streams.push_back(changeCoordinators.getReceiver());
streams.push_back(getCommitVersion.getReceiver(TaskPriority::GetConsistentReadVersion));
streams.push_back(notifyBackupWorkerDone.getReceiver());
streams.push_back(getLiveCommittedVersion.getReceiver(TaskPriority::GetLiveCommittedVersion));
streams.push_back(reportLiveCommittedVersion.getReceiver(TaskPriority::ReportLiveCommittedVersion));
FlowTransport::transport().addEndpoints(streams);
}
};
@ -170,6 +178,23 @@ struct GetCommitVersionRequest {
}
};
struct ReportRawCommittedVersionRequest {
constexpr static FileIdentifier file_identifier = 1853148;
Version version;
bool locked;
Optional<Value> metadataVersion;
ReplyPromise<Void> reply;
ReportRawCommittedVersionRequest() : version(invalidVersion), locked(false) {}
ReportRawCommittedVersionRequest(Version version, bool locked, Optional<Value> metadataVersion) : version(version), locked(locked), metadataVersion(metadataVersion) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, locked, metadataVersion, reply);
}
};
struct BackupWorkerDoneRequest {
constexpr static FileIdentifier file_identifier = 8736351;
UID workerUID;

View File

@ -1181,7 +1181,7 @@ ACTOR Future<Void> commitBatch(
self->metadataVersion = v.metadataVersion;
self->committedVersion.set(v.version);
}
if (self->committedVersion.get() < commitVersion - SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS)
wait(delay(SERVER_KNOBS->PROXY_SPIN_DELAY));
}
@ -1278,9 +1278,16 @@ ACTOR Future<Void> commitBatch(
TEST(self->committedVersion.get() > commitVersion); // A later version was reported committed first
if( commitVersion > self->committedVersion.get() ) {
if (SERVER_KNOBS->ASK_READ_VERSION_FROM_MASTER) {
// Let master know this commit version so that every other proxy can know.
wait(self->master.reportLiveCommittedVersion.getReply(ReportRawCommittedVersionRequest(commitVersion, lockedAfter, metadataVersionAfter), TaskPriority::ProxyMasterVersionReply));
}
self->locked = lockedAfter;
self->metadataVersion = metadataVersionAfter;
self->committedVersion.set(commitVersion);
TEST(commitVersion < self->committedVersion.get());
if (commitVersion > self->committedVersion.get()) {
self->committedVersion.set(commitVersion);
}
}
if (forceRecovery) {
@ -1381,7 +1388,7 @@ ACTOR Future<Void> updateLastCommit(ProxyCommitData* self, Optional<UID> debugID
return Void();
}
ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commitData, uint32_t flags, vector<MasterProxyInterface> *otherProxies, Optional<UID> debugID,
ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commitData, uint32_t flags, vector<MasterProxyInterface> *otherProxies, Optional<UID> debugID,
int transactionCount, int systemTransactionCount, int defaultPriTransactionCount, int batchPriTransactionCount)
{
// Returns a version which (1) is committed, and (2) is >= the latest version reported committed (by a commit response) when this request was sent
@ -1389,10 +1396,14 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commi
// (2) No proxy on our list reported committed a higher version before this request was received, because then its committedVersion would have been higher,
// and no other proxy could have already committed anything without first ending the epoch
++commitData->stats.txnStartBatch;
state vector<Future<GetReadVersionReply>> proxyVersions;
for (auto const& p : *otherProxies)
proxyVersions.push_back(brokenPromiseToNever(p.getRawCommittedVersion.getReply(GetRawCommittedVersionRequest(debugID), TaskPriority::TLogConfirmRunningReply)));
state Future<GetReadVersionReply> replyFromMasterFuture;
if (SERVER_KNOBS->ASK_READ_VERSION_FROM_MASTER) {
replyFromMasterFuture = commitData->master.getLiveCommittedVersion.getReply(GetRawCommittedVersionRequest(debugID), TaskPriority::GetLiveCommittedVersionReply);
} else {
for (auto const& p : *otherProxies)
proxyVersions.push_back(brokenPromiseToNever(p.getRawCommittedVersion.getReply(GetRawCommittedVersionRequest(debugID), TaskPriority::TLogConfirmRunningReply)));
}
if (!SERVER_KNOBS->ALWAYS_CAUSAL_READ_RISKY && !(flags&GetReadVersionRequest::FLAG_CAUSAL_READ_RISKY)) {
wait(updateLastCommit(commitData, debugID));
@ -1404,18 +1415,25 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commi
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.confirmEpochLive");
}
vector<GetReadVersionReply> versions = wait(getAll(proxyVersions));
GetReadVersionReply rep;
rep.version = commitData->committedVersion.get();
state GetReadVersionReply rep;
rep.locked = commitData->locked;
rep.metadataVersion = commitData->metadataVersion;
rep.recentRequests = commitData->stats.getRecentRequests();
rep.version = commitData->committedVersion.get();
for (auto v : versions) {
if(v.version > rep.version) {
rep = v;
if (SERVER_KNOBS->ASK_READ_VERSION_FROM_MASTER) {
GetReadVersionReply replyFromMaster = wait(replyFromMasterFuture);
if (replyFromMaster.version > rep.version) {
rep = replyFromMaster;
}
} else {
vector<GetReadVersionReply> versions = wait(getAll(proxyVersions));
for (auto v : versions) {
if (v.version > rep.version) {
rep = v;
}
}
}
rep.recentRequests = commitData->stats.getRecentRequests();
if (debugID.present()) {
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.After");

View File

@ -174,6 +174,10 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
recoveryTransactionVersion; // The first version in this epoch
double lastCommitTime;
Version liveCommittedVersion; // The largest live committed version reported by proxies.
bool databaseLocked;
Optional<Value> proxyMetadataVersion;
DatabaseConfiguration originalConfiguration;
DatabaseConfiguration configuration;
std::vector<Optional<Key>> primaryDcId;
@ -251,6 +255,8 @@ struct MasterData : NonCopyable, ReferenceCounted<MasterData> {
primaryLocality(tagLocalityInvalid),
neverCreated(false),
lastEpochEnd(invalidVersion),
liveCommittedVersion(invalidVersion),
databaseLocked(false),
recoveryTransactionVersion(invalidVersion),
lastCommitTime(0),
registrationCount(0),
@ -997,6 +1003,34 @@ ACTOR Future<Void> provideVersions(Reference<MasterData> self) {
}
}
ACTOR Future<Void> serveLiveCommittedVersion(Reference<MasterData> self) {
loop {
choose {
when(GetRawCommittedVersionRequest req = waitNext(self->myInterface.getLiveCommittedVersion.getFuture())) {
if (req.debugID.present())
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterServer.serveLiveCommittedVersion.GetRawCommittedVersion");
if(self->liveCommittedVersion == invalidVersion) {
self->liveCommittedVersion = self->recoveryTransactionVersion;
}
GetReadVersionReply reply;
reply.version = self->liveCommittedVersion;
reply.locked = self->databaseLocked;
reply.metadataVersion = self->proxyMetadataVersion;
req.reply.send(reply);
}
when(ReportRawCommittedVersionRequest req = waitNext(self->myInterface.reportLiveCommittedVersion.getFuture())) {
if (req.version > self->liveCommittedVersion) {
self->liveCommittedVersion = req.version;
self->databaseLocked = req.locked;
self->proxyMetadataVersion = req.metadataVersion;
}
req.reply.send(Void());
}
}
}
}
std::pair<KeyRangeRef, bool> findRange( CoalescedKeyRangeMap<int>& key_resolver, Standalone<VectorRef<ResolverMoveRef>>& movedRanges, int src, int dest ) {
auto ranges = key_resolver.ranges();
auto prev = ranges.begin();
@ -1531,6 +1565,7 @@ ACTOR Future<Void> masterCore( Reference<MasterData> self ) {
self->addActor.send( waitResolverFailure( self->resolvers ) );
self->addActor.send( waitProxyFailure( self->proxies ) );
self->addActor.send( provideVersions(self) );
self->addActor.send( serveLiveCommittedVersion(self) );
self->addActor.send( reportErrors(updateRegistration(self, self->logSystem), "UpdateRegistration", self->dbgid) );
self->registrationTrigger.trigger();

View File

@ -1191,6 +1191,9 @@ ACTOR Future<Void> workerServer(
DUMPTOKEN( recruited.tlogRejoin );
DUMPTOKEN( recruited.changeCoordinators );
DUMPTOKEN( recruited.getCommitVersion );
DUMPTOKEN( recruited.getLiveCommittedVersion);
DUMPTOKEN( recruited.reportLiveCommittedVersion);
DUMPTOKEN( recruited.notifyBackupWorkerDone);
//printf("Recruited as masterServer\n");
Future<Void> masterProcess = masterServer( recruited, dbInfo, ServerCoordinators( connFile ), req.lifetime, req.forceRecovery );

View File

@ -71,10 +71,13 @@ enum class TaskPriority {
ProxyResolverReply = 8547,
ProxyCommit = 8545,
ProxyCommitBatcher = 8540,
ReportLiveCommittedVersion = 8535,
TLogConfirmRunningReply = 8530,
TLogConfirmRunning = 8520,
ProxyGRVTimer = 8510,
GetConsistentReadVersion = 8500,
GetLiveCommittedVersionReply = 8490,
GetLiveCommittedVersion = 8480,
DefaultPromiseEndpoint = 8000,
DefaultOnMainThread = 7500,
DefaultDelay = 7010,