log routers now calculate a precise version to pop for their log router tag

This commit is contained in:
Evan Tschannen 2018-06-21 15:29:46 -07:00
parent b6ad8b1092
commit 68ac3bdc4c
8 changed files with 81 additions and 43 deletions

View File

@ -79,6 +79,7 @@ struct LogRouterData {
NotifiedVersion version;
NotifiedVersion minPopped;
Version startVersion;
Version minKnownCommittedVersion;
Deque<std::pair<Version, Standalone<VectorRef<uint8_t>>>> messageBlocks;
Tag routerTag;
bool allowPops;
@ -101,7 +102,7 @@ struct LogRouterData {
return newTagData;
}
LogRouterData(UID dbgid, InitializeLogRouterRequest req) : dbgid(dbgid), routerTag(req.routerTag), logSystem(new AsyncVar<Reference<ILogSystem>>()), version(req.startVersion-1), minPopped(req.startVersion-1), startVersion(req.startVersion), allowPops(false) {
LogRouterData(UID dbgid, InitializeLogRouterRequest req) : dbgid(dbgid), routerTag(req.routerTag), logSystem(new AsyncVar<Reference<ILogSystem>>()), version(req.startVersion-1), minPopped(req.startVersion-1), startVersion(req.startVersion), allowPops(false), minKnownCommittedVersion(0) {
//setup just enough of a logSet to be able to call getPushLocations
logSet.logServers.resize(req.tLogLocalities.size());
logSet.tLogPolicy = req.tLogPolicy;
@ -162,7 +163,7 @@ void commitMessages( LogRouterData* self, Version version, const std::vector<Tag
}
}
}
msgSize -= msg.message.size();
}
self->messageBlocks.push_back( std::make_pair(version, block) );
@ -193,6 +194,8 @@ ACTOR Future<Void> pullAsyncData( LogRouterData *self ) {
}
}
self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, r->getMinKnownCommittedVersion());
state Version ver = 0;
state std::vector<TagsAndMessage> messages;
while (true) {
@ -306,6 +309,7 @@ ACTOR Future<Void> logRouterPeekMessages( LogRouterData* self, TLogPeekRequest r
TLogPeekReply reply;
reply.maxKnownVersion = self->version.get();
reply.minKnownCommittedVersion = self->minKnownCommittedVersion;
reply.messages = messages.toStringRef();
reply.popped = self->minPopped.get() >= self->startVersion ? self->minPopped.get() : 0;
reply.end = endVersion;
@ -340,10 +344,7 @@ ACTOR Future<Void> logRouterPop( LogRouterData* self, TLogPopRequest req ) {
}
if(self->logSystem->get() && self->allowPops) {
//The knownCommittedVersion might not be committed on the primary logs, so subtracting max_read_transaction_life_versions will ensure it is committed.
//We then need to subtract max_read_transaction_life_versions again ensure we do not pop below the knownCommittedVersion of the primary logs.
//FIXME: if we get the true knownCommittedVersion when peeking from the primary logs we only need to subtract max_read_transaction_life_versions once.
self->logSystem->get()->pop(minKnownCommittedVersion - 2*SERVER_KNOBS->MAX_READ_TRANSACTION_LIFE_VERSIONS, self->routerTag);
self->logSystem->get()->pop(std::min(minKnownCommittedVersion, self->minKnownCommittedVersion), self->routerTag);
}
req.reply.send(Void());
self->minPopped.set(std::max(minPopped, self->minPopped.get()));

View File

@ -314,6 +314,8 @@ struct ILogSystem {
// Returns the maximum version known to have been pushed (not necessarily durably) into the log system (0 is always a possible result!)
virtual Version getMaxKnownVersion() { return 0; }
virtual Version getMinKnownCommittedVersion() = 0;
virtual void addref() = 0;
virtual void delref() = 0;
@ -358,6 +360,7 @@ struct ILogSystem {
virtual bool isExhausted();
virtual LogMessageVersion version();
virtual Version popped();
virtual Version getMinKnownCommittedVersion();
virtual void addref() {
ReferenceCounted<ServerPeekCursor>::addref();
@ -411,6 +414,7 @@ struct ILogSystem {
virtual bool isExhausted();
virtual LogMessageVersion version();
virtual Version popped();
virtual Version getMinKnownCommittedVersion();
virtual void addref() {
ReferenceCounted<MergedPeekCursor>::addref();
@ -455,6 +459,7 @@ struct ILogSystem {
virtual bool isExhausted();
virtual LogMessageVersion version();
virtual Version popped();
virtual Version getMinKnownCommittedVersion();
virtual void addref() {
ReferenceCounted<SetPeekCursor>::addref();
@ -488,6 +493,7 @@ struct ILogSystem {
virtual bool isExhausted();
virtual LogMessageVersion version();
virtual Version popped();
virtual Version getMinKnownCommittedVersion();
virtual void addref() {
ReferenceCounted<MultiCursor>::addref();
@ -516,7 +522,7 @@ struct ILogSystem {
// Never returns normally, but throws an error if the subsystem stops working
//Future<Void> push( UID bundle, int64_t seq, VectorRef<TaggedMessageRef> messages );
virtual Future<Void> push( Version prevVersion, Version version, Version knownCommittedVersion, struct LogPushData& data, Optional<UID> debugID = Optional<UID>() ) = 0;
virtual Future<Version> push( Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, struct LogPushData& data, Optional<UID> debugID = Optional<UID>() ) = 0;
// Waits for the version number of the bundle (in this epoch) to be prevVersion (i.e. for all pushes ordered earlier)
// Puts the given messages into the bundle, each with the given tags, and with message versions (version, 0) - (version, N)
// Changes the version number of the bundle to be version (unblocking the next push)
@ -535,7 +541,7 @@ struct ILogSystem {
// Same contract as peek(), but blocks until the preferred log server(s) for the given tag are available (and is correspondingly less expensive)
virtual Reference<IPeekCursor> peekLogRouter( UID dbgid, Version begin, Tag tag ) = 0;
// Same contract as peek(), but can only peek from the logs elected in the same generation.
// Same contract as peek(), but can only peek from the logs elected in the same generation.
// If the preferred log server is down, a different log from the same generation will merge results locally before sending them to the log router.
virtual void pop( Version upTo, Tag tag, Version knownCommittedVersion = 0, int8_t popLocality = tagLocalityInvalid ) = 0;

View File

@ -26,6 +26,7 @@
ILogSystem::ServerPeekCursor::ServerPeekCursor( Reference<AsyncVar<OptionalInterface<TLogInterface>>> const& interf, Tag tag, Version begin, Version end, bool returnIfBlocked, bool parallelGetMore )
: interf(interf), tag(tag), messageVersion(begin), end(end), hasMsg(false), rd(results.arena, results.messages, Unversioned()), randomID(g_random->randomUniqueID()), poppedVersion(0), returnIfBlocked(returnIfBlocked), sequence(0), parallelGetMore(parallelGetMore) {
this->results.maxKnownVersion = 0;
this->results.minKnownCommittedVersion = 0;
//TraceEvent("SPC_Starting", randomID).detail("Tag", tag.toString()).detail("Begin", begin).detail("End", end).backtrace();
}
@ -34,6 +35,7 @@ ILogSystem::ServerPeekCursor::ServerPeekCursor( TLogPeekReply const& results, Lo
{
//TraceEvent("SPC_Clone", randomID);
this->results.maxKnownVersion = 0;
this->results.minKnownCommittedVersion = 0;
if(hasMsg)
nextMessage();
@ -254,6 +256,8 @@ bool ILogSystem::ServerPeekCursor::isExhausted() {
LogMessageVersion ILogSystem::ServerPeekCursor::version() { return messageVersion; } // Call only after nextMessage(). The sequence of the current message, or results.end if nextMessage() has returned false.
Version ILogSystem::ServerPeekCursor::getMinKnownCommittedVersion() { return results.minKnownCommittedVersion; }
Version ILogSystem::ServerPeekCursor::popped() { return poppedVersion; }
ILogSystem::MergedPeekCursor::MergedPeekCursor( vector< Reference<ILogSystem::IPeekCursor> > const& serverCursors, Version begin, bool collectTags )
@ -485,6 +489,10 @@ bool ILogSystem::MergedPeekCursor::isExhausted() {
LogMessageVersion ILogSystem::MergedPeekCursor::version() { return messageVersion; }
Version ILogSystem::MergedPeekCursor::getMinKnownCommittedVersion() {
return serverCursors[currentCursor]->getMinKnownCommittedVersion();
}
Version ILogSystem::MergedPeekCursor::popped() {
Version poppedVersion = 0;
for (auto& c : serverCursors)
@ -776,6 +784,10 @@ bool ILogSystem::SetPeekCursor::isExhausted() {
LogMessageVersion ILogSystem::SetPeekCursor::version() { return messageVersion; }
Version ILogSystem::SetPeekCursor::getMinKnownCommittedVersion() {
return serverCursors[currentSet][currentCursor]->getMinKnownCommittedVersion();
}
Version ILogSystem::SetPeekCursor::popped() {
Version poppedVersion = 0;
for (auto& cursors : serverCursors) {
@ -858,6 +870,10 @@ LogMessageVersion ILogSystem::MultiCursor::version() {
return cursors.back()->version();
}
Version ILogSystem::MultiCursor::getMinKnownCommittedVersion() {
return cursors.back()->getMinKnownCommittedVersion();
}
Version ILogSystem::MultiCursor::popped() {
return std::max(poppedVersion, cursors.back()->popped());
}

View File

@ -175,7 +175,8 @@ struct ProxyCommitData {
LogSystemDiskQueueAdapter* logAdapter;
Reference<ILogSystem> logSystem;
IKeyValueStore* txnStateStore;
NotifiedVersion committedVersion; // Provided that this recovery has succeeded or will succeed, this version is fully committed (durable)
NotifiedVersion committedVersion; // Provided that this recovery has succeeded or will succeed, this version is fully committed (durable)
Version minKnownCommittedVersion; // No version smaller than this one will be used as the known committed version during recovery
Version version; // The version at which txnStateStore is up to date
Promise<Void> validState; // Set once txnStateStore and version are valid
double lastVersionTime;
@ -223,9 +224,9 @@ struct ProxyCommitData {
}
ProxyCommitData(UID dbgid, MasterInterface master, RequestStream<GetReadVersionRequest> getConsistentReadVersion, Version recoveryTransactionVersion, RequestStream<CommitTransactionRequest> commit, Reference<AsyncVar<ServerDBInfo>> db, bool firstProxy)
: dbgid(dbgid), stats(dbgid, &version, &committedVersion, &commitBatchesMemBytesCount), master(master),
: dbgid(dbgid), stats(dbgid, &version, &committedVersion, &commitBatchesMemBytesCount), master(master),
logAdapter(NULL), txnStateStore(NULL),
committedVersion(recoveryTransactionVersion), version(0),
committedVersion(recoveryTransactionVersion), version(0), minKnownCommittedVersion(0),
lastVersionTime(0), commitVersionRequestNumber(1), mostRecentProcessedRequestNumber(0),
getConsistentReadVersion(getConsistentReadVersion), commit(commit), lastCoalesceTime(0),
localCommitBatchesStarted(0), locked(false), firstProxy(firstProxy),
@ -794,8 +795,7 @@ ACTOR Future<Void> commitBatch(
if ( prevVersion && commitVersion - prevVersion < SERVER_KNOBS->MAX_VERSIONS_IN_FLIGHT/2 )
debug_advanceMaxCommittedVersion(UID(), commitVersion);
Future<Void> loggingComplete = self->logSystem->push( prevVersion, commitVersion, self->committedVersion.get(), toCommit, debugID )
|| self->committedVersion.whenAtLeast( commitVersion+1 );
Future<Version> loggingComplete = self->logSystem->push( prevVersion, commitVersion, self->committedVersion.get(), self->minKnownCommittedVersion, toCommit, debugID );
if (!forceRecovery) {
ASSERT(self->latestLocalCommitBatchLogging.get() == localBatchNumber-1);
@ -803,7 +803,20 @@ ACTOR Future<Void> commitBatch(
}
/////// Phase 4: Logging (network bound; pipelined up to MAX_READ_TRANSACTION_LIFE_VERSIONS (limited by loop above))
Void _ = wait(loggingComplete);
try {
choose {
when(Version ver = wait(loggingComplete)) {
self->minKnownCommittedVersion = std::max(self->minKnownCommittedVersion, ver);
}
when(Void _ = wait(self->committedVersion.whenAtLeast( commitVersion+1 ))) {}
}
} catch(Error &e) {
if(e.code() == error_code_broken_promise) {
throw master_tlog_failed();
}
throw;
}
Void _ = wait(yield());
self->logSystem->pop(msg.popTo, txsTag);

View File

@ -952,6 +952,7 @@ namespace oldTLog {
TLogPeekReply reply;
reply.maxKnownVersion = logData->version.get();
reply.minKnownCommittedVersion = 0;
if(poppedVer > req.begin) {
reply.popped = poppedVer;
reply.end = poppedVer;

View File

@ -127,7 +127,7 @@ struct VerUpdateRef {
VerUpdateRef( Arena& to, const VerUpdateRef& from ) : version(from.version), mutations( to, from.mutations ), isPrivateData( from.isPrivateData ) {}
int expectedSize() const { return mutations.expectedSize(); }
template <class Ar>
template <class Ar>
void serialize( Ar& ar ) {
ar & version & mutations & isPrivateData;
}
@ -139,10 +139,11 @@ struct TLogPeekReply {
Version end;
Optional<Version> popped;
Version maxKnownVersion;
Version minKnownCommittedVersion;
template <class Ar>
void serialize(Ar& ar) {
ar & arena & messages & end & popped & maxKnownVersion;
ar & arena & messages & end & popped & maxKnownVersion & minKnownCommittedVersion;
}
};
@ -198,19 +199,19 @@ struct TagMessagesRef {
struct TLogCommitRequest {
Arena arena;
Version prevVersion, version, knownCommittedVersion;
Version prevVersion, version, knownCommittedVersion, minKnownCommittedVersion;
StringRef messages;// Each message prefixed by a 4-byte length
ReplyPromise<Void> reply;
ReplyPromise<Version> reply;
Optional<UID> debugID;
TLogCommitRequest() {}
TLogCommitRequest( const Arena& a, Version prevVersion, Version version, Version knownCommittedVersion, StringRef messages, Optional<UID> debugID )
: arena(a), prevVersion(prevVersion), version(version), knownCommittedVersion(knownCommittedVersion), messages(messages), debugID(debugID) {}
template <class Ar>
TLogCommitRequest( const Arena& a, Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, StringRef messages, Optional<UID> debugID )
: arena(a), prevVersion(prevVersion), version(version), knownCommittedVersion(knownCommittedVersion), minKnownCommittedVersion(minKnownCommittedVersion), messages(messages), debugID(debugID) {}
template <class Ar>
void serialize( Ar& ar ) {
ar & prevVersion & version & knownCommittedVersion & messages & reply & arena & debugID;
ar & prevVersion & version & knownCommittedVersion & minKnownCommittedVersion & messages & reply & arena & debugID;
}
};

View File

@ -360,7 +360,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
VersionMetricHandle persistentDataVersion, persistentDataDurableVersion; // The last version number in the portion of the log (written|durable) to persistentData
NotifiedVersion version, queueCommittedVersion;
Version queueCommittingVersion;
Version knownCommittedVersion, durableKnownCommittedVersion;
Version knownCommittedVersion, durableKnownCommittedVersion, minKnownCommittedVersion;
Deque<std::pair<Version, Standalone<VectorRef<uint8_t>>>> messageBlocks;
std::vector<std::vector<Reference<TagData>>> tag_data; //tag.locality | tag.id
@ -409,7 +409,7 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
explicit LogData(TLogData* tLogData, TLogInterface interf, Tag remoteTag, bool isPrimary, int logRouterTags, UID recruitmentID) : tLogData(tLogData), knownCommittedVersion(1), logId(interf.id()),
cc("TLog", interf.id().toString()), bytesInput("BytesInput", cc), bytesDurable("BytesDurable", cc), remoteTag(remoteTag), isPrimary(isPrimary), logRouterTags(logRouterTags), recruitmentID(recruitmentID),
logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0),
logSystem(new AsyncVar<Reference<ILogSystem>>()), logRouterPoppedVersion(0), durableKnownCommittedVersion(0), minKnownCommittedVersion(0),
// These are initialized differently on init() or recovery
recoveryCount(), stopped(false), initialized(false), queueCommittingVersion(0), newPersistentDataVersion(invalidVersion), unrecoveredBefore(1), recoveredAt(1), unpoppedRecoveredTags(0),
logRouterPopToVersion(0), locality(tagLocalityInvalid)
@ -990,6 +990,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
if(poppedVer > req.begin) {
TLogPeekReply rep;
rep.maxKnownVersion = logData->version.get();
rep.minKnownCommittedVersion = logData->minKnownCommittedVersion;
rep.popped = poppedVer;
rep.end = poppedVer;
@ -1048,6 +1049,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
TLogPeekReply reply;
reply.maxKnownVersion = logData->version.get();
reply.minKnownCommittedVersion = logData->minKnownCommittedVersion;
reply.messages = messages.toStringRef();
reply.end = endVersion;
@ -1167,6 +1169,7 @@ ACTOR Future<Void> tLogCommit(
}
logData->knownCommittedVersion = std::max(logData->knownCommittedVersion, req.knownCommittedVersion);
logData->minKnownCommittedVersion = std::max(logData->minKnownCommittedVersion, req.minKnownCommittedVersion);
Void _ = wait( logData->version.whenAtLeast( req.prevVersion ) );
@ -1232,7 +1235,7 @@ ACTOR Future<Void> tLogCommit(
if(req.debugID.present())
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.After");
req.reply.send( Void() );
req.reply.send( logData->durableKnownCommittedVersion );
return Void();
}

View File

@ -30,17 +30,15 @@
#include "fdbrpc/ReplicationUtils.h"
#include "RecoveryState.h"
ACTOR static Future<Void> reportTLogCommitErrors( Future<Void> commitReply, UID debugID ) {
try {
Void _ = wait(commitReply);
return Void();
} catch (Error& e) {
if (e.code() == error_code_broken_promise)
throw master_tlog_failed();
else if (e.code() != error_code_actor_cancelled && e.code() != error_code_tlog_stopped)
TraceEvent(SevError, "MasterTLogCommitRequestError", debugID).error(e);
throw;
ACTOR Future<Version> minVersionWhenReady( Future<Void> f, std::vector<Future<Version>> replies) {
Void _ = wait(f);
Version minVersion = std::numeric_limits<Version>::max();
for(auto& reply : replies) {
if(reply.isReady() && !reply.isError()) {
minVersion = std::min(minVersion, reply.get());
}
}
return minVersion;
}
struct OldLogData {
@ -391,27 +389,26 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
virtual Future<Void> push( Version prevVersion, Version version, Version knownCommittedVersion, LogPushData& data, Optional<UID> debugID ) {
virtual Future<Version> push( Version prevVersion, Version version, Version knownCommittedVersion, Version minKnownCommittedVersion, LogPushData& data, Optional<UID> debugID ) {
// FIXME: Randomize request order as in LegacyLogSystem?
vector<Future<Void>> quorumResults;
vector<Future<Version>> allReplies;
int location = 0;
for(auto& it : tLogs) {
if(it->isLocal && it->logServers.size()) {
vector<Future<Void>> tLogCommitResults;
for(int loc=0; loc< it->logServers.size(); loc++) {
Future<Void> commitMessage = reportTLogCommitErrors(
it->logServers[loc]->get().interf().commit.getReply(
TLogCommitRequest( data.getArena(), prevVersion, version, knownCommittedVersion, data.getMessages(location), debugID ), TaskTLogCommitReply ),
getDebugID());
addActor.get().send(commitMessage);
tLogCommitResults.push_back(commitMessage);
allReplies.push_back( it->logServers[loc]->get().interf().commit.getReply( TLogCommitRequest( data.getArena(), prevVersion, version, knownCommittedVersion, minKnownCommittedVersion, data.getMessages(location), debugID ), TaskTLogCommitReply ) );
Future<Void> commitSuccess = success(allReplies.back());
addActor.get().send(commitSuccess);
tLogCommitResults.push_back(commitSuccess);
location++;
}
quorumResults.push_back( quorum( tLogCommitResults, tLogCommitResults.size() - it->tLogWriteAntiQuorum ) );
}
}
return waitForAll(quorumResults);
return minVersionWhenReady( waitForAll(quorumResults), allReplies);
}
Reference<IPeekCursor> peekAll( UID dbgid, Version begin, Version end, Tag tag, bool parallelGetMore, bool throwIfDead ) {