fixed a priority inversion where the tlog would prefer to copy data from the previous generation rather than make data durable (leading to being ratekeeper controlled)
This commit is contained in:
parent
29ebb19388
commit
de119f192d
|
@ -71,7 +71,7 @@ struct ILogSystem {
|
|||
|
||||
//returns immediately if hasMessage() returns true.
|
||||
//returns when either the result of hasMessage() or version() has changed.
|
||||
virtual Future<Void> getMore() = 0;
|
||||
virtual Future<Void> getMore(int taskID = TaskTLogPeekReply) = 0;
|
||||
|
||||
//returns when the failure monitor detects that the servers associated with the cursor are failed
|
||||
virtual Future<Void> onFailed() = 0;
|
||||
|
@ -137,7 +137,7 @@ struct ILogSystem {
|
|||
|
||||
virtual void advanceTo(LogMessageVersion n);
|
||||
|
||||
virtual Future<Void> getMore();
|
||||
virtual Future<Void> getMore(int taskID = TaskTLogPeekReply);
|
||||
|
||||
virtual Future<Void> onFailed();
|
||||
|
||||
|
@ -159,7 +159,7 @@ struct ILogSystem {
|
|||
};
|
||||
|
||||
struct MergedPeekCursor : IPeekCursor, ReferenceCounted<MergedPeekCursor> {
|
||||
LocalityGroup localityGroup;
|
||||
LocalityGroup localityGroup;
|
||||
std::vector< std::pair<LogMessageVersion, int> > sortedVersions;
|
||||
vector< Reference<IPeekCursor> > serverCursors;
|
||||
Tag tag;
|
||||
|
@ -168,8 +168,8 @@ struct ILogSystem {
|
|||
LogMessageVersion messageVersion;
|
||||
bool hasNextMessage;
|
||||
UID randomID;
|
||||
int tLogReplicationFactor;
|
||||
IRepPolicyRef tLogPolicy;
|
||||
int tLogReplicationFactor;
|
||||
IRepPolicyRef tLogPolicy;
|
||||
std::vector< LocalityData > tLogLocalities;
|
||||
|
||||
MergedPeekCursor( std::vector<Reference<AsyncVar<OptionalInterface<TLogInterface>>>> const& logServers, int bestServer, int readQuorum, Tag tag, Version begin, Version end, bool parallelGetMore, std::vector< LocalityData > const& tLogLocalities, IRepPolicyRef const tLogPolicy, int tLogReplicationFactor );
|
||||
|
@ -198,7 +198,7 @@ struct ILogSystem {
|
|||
|
||||
virtual void advanceTo(LogMessageVersion n);
|
||||
|
||||
virtual Future<Void> getMore();
|
||||
virtual Future<Void> getMore(int taskID = TaskTLogPeekReply);
|
||||
|
||||
virtual Future<Void> onFailed();
|
||||
|
||||
|
@ -240,7 +240,7 @@ struct ILogSystem {
|
|||
|
||||
virtual void advanceTo(LogMessageVersion n);
|
||||
|
||||
virtual Future<Void> getMore();
|
||||
virtual Future<Void> getMore(int taskID = TaskTLogPeekReply);
|
||||
|
||||
virtual Future<Void> onFailed();
|
||||
|
||||
|
|
|
@ -114,7 +114,7 @@ void ILogSystem::ServerPeekCursor::advanceTo(LogMessageVersion n) {
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self ) {
|
||||
ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self, int taskID ) {
|
||||
if( !self->interf || self->messageVersion >= self->end ) {
|
||||
Void _ = wait( Future<Void>(Never()));
|
||||
throw internal_error();
|
||||
|
@ -127,7 +127,7 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
|
|||
loop {
|
||||
try {
|
||||
while(self->futureResults.size() < SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS && self->interf->get().present()) {
|
||||
self->futureResults.push_back( brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, std::make_pair(self->randomID, self->sequence++)), TaskTLogPeekReply) ) );
|
||||
self->futureResults.push_back( brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked, std::make_pair(self->randomID, self->sequence++)), taskID) ) );
|
||||
}
|
||||
|
||||
choose {
|
||||
|
@ -168,7 +168,7 @@ ACTOR Future<Void> serverPeekParallelGetMore( ILogSystem::ServerPeekCursor* self
|
|||
}
|
||||
}
|
||||
|
||||
ACTOR Future<Void> serverPeekGetMore( ILogSystem::ServerPeekCursor* self ) {
|
||||
ACTOR Future<Void> serverPeekGetMore( ILogSystem::ServerPeekCursor* self, int taskID ) {
|
||||
if( !self->interf || self->messageVersion >= self->end ) {
|
||||
Void _ = wait( Future<Void>(Never()));
|
||||
throw internal_error();
|
||||
|
@ -177,7 +177,7 @@ ACTOR Future<Void> serverPeekGetMore( ILogSystem::ServerPeekCursor* self ) {
|
|||
loop {
|
||||
choose {
|
||||
when( TLogPeekReply res = wait( self->interf->get().present() ?
|
||||
brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked), TaskTLogPeekReply) ) : Never() ) ) {
|
||||
brokenPromiseToNever( self->interf->get().interf().peekMessages.getReply(TLogPeekRequest(self->messageVersion.version,self->tag,self->returnIfBlocked), taskID) ) : Never() ) ) {
|
||||
self->results = res;
|
||||
if(res.popped.present())
|
||||
self->poppedVersion = std::min( std::max(self->poppedVersion, res.popped.get()), self->end.version );
|
||||
|
@ -201,12 +201,12 @@ ACTOR Future<Void> serverPeekGetMore( ILogSystem::ServerPeekCursor* self ) {
|
|||
}
|
||||
}
|
||||
|
||||
Future<Void> ILogSystem::ServerPeekCursor::getMore() {
|
||||
Future<Void> ILogSystem::ServerPeekCursor::getMore(int taskID) {
|
||||
//TraceEvent("SPC_getMore", randomID).detail("hasMessage", hasMessage()).detail("more", !more.isValid() || more.isReady()).detail("messageVersion", messageVersion.toString()).detail("end", end.toString());
|
||||
if( hasMessage() )
|
||||
return Void();
|
||||
if( !more.isValid() || more.isReady() ) {
|
||||
more = parallelGetMore ? serverPeekParallelGetMore(this) : serverPeekGetMore(this);
|
||||
more = parallelGetMore ? serverPeekParallelGetMore(this, taskID) : serverPeekGetMore(this, taskID);
|
||||
}
|
||||
return more;
|
||||
}
|
||||
|
@ -372,17 +372,17 @@ void ILogSystem::MergedPeekCursor::advanceTo(LogMessageVersion n) {
|
|||
calcHasMessage();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> mergedPeekGetMore(ILogSystem::MergedPeekCursor* self, LogMessageVersion startVersion) {
|
||||
ACTOR Future<Void> mergedPeekGetMore(ILogSystem::MergedPeekCursor* self, LogMessageVersion startVersion, int taskID) {
|
||||
loop {
|
||||
//TraceEvent("MPC_getMoreA", self->randomID).detail("start", startVersion.toString());
|
||||
if(self->serverCursors[self->bestServer]->isActive()) {
|
||||
ASSERT(!self->serverCursors[self->bestServer]->hasMessage());
|
||||
Void _ = wait( self->serverCursors[self->bestServer]->getMore() || self->serverCursors[self->bestServer]->onFailed() );
|
||||
Void _ = wait( self->serverCursors[self->bestServer]->getMore(taskID) || self->serverCursors[self->bestServer]->onFailed() );
|
||||
} else {
|
||||
vector<Future<Void>> q;
|
||||
for (auto& c : self->serverCursors)
|
||||
if (!c->hasMessage())
|
||||
q.push_back(c->getMore());
|
||||
q.push_back(c->getMore(taskID));
|
||||
Void _ = wait(quorum(q, 1));
|
||||
}
|
||||
self->calcHasMessage();
|
||||
|
@ -392,7 +392,7 @@ ACTOR Future<Void> mergedPeekGetMore(ILogSystem::MergedPeekCursor* self, LogMess
|
|||
}
|
||||
}
|
||||
|
||||
Future<Void> ILogSystem::MergedPeekCursor::getMore() {
|
||||
Future<Void> ILogSystem::MergedPeekCursor::getMore(int taskID) {
|
||||
auto startVersion = version();
|
||||
calcHasMessage();
|
||||
if( hasMessage() )
|
||||
|
@ -403,7 +403,7 @@ Future<Void> ILogSystem::MergedPeekCursor::getMore() {
|
|||
if (version() > startVersion)
|
||||
return Void();
|
||||
|
||||
return mergedPeekGetMore(this, startVersion);
|
||||
return mergedPeekGetMore(this, startVersion, taskID);
|
||||
}
|
||||
|
||||
Future<Void> ILogSystem::MergedPeekCursor::onFailed() {
|
||||
|
@ -464,13 +464,13 @@ void ILogSystem::MultiCursor::advanceTo(LogMessageVersion n) {
|
|||
cursors.back()->advanceTo(n);
|
||||
}
|
||||
|
||||
Future<Void> ILogSystem::MultiCursor::getMore() {
|
||||
Future<Void> ILogSystem::MultiCursor::getMore(int taskID) {
|
||||
while( cursors.size() > 1 && cursors.back()->version() >= epochEnds.back() ) {
|
||||
poppedVersion = std::max(poppedVersion, cursors.back()->popped());
|
||||
cursors.pop_back();
|
||||
epochEnds.pop_back();
|
||||
}
|
||||
return cursors.back()->getMore();
|
||||
return cursors.back()->getMore(taskID);
|
||||
}
|
||||
|
||||
Future<Void> ILogSystem::MultiCursor::onFailed() {
|
||||
|
|
|
@ -1501,7 +1501,7 @@ bool tlogTerminated( TLogData* self, IKeyValueStore* persistentData, TLogQueue*
|
|||
return false;
|
||||
}
|
||||
|
||||
ACTOR Future<Void> recoverTagFromLogSystem( TLogData* self, Reference<LogData> logData, Version beginVersion, Version endVersion, Tag tag, Reference<AsyncVar<int>> uncommittedBytes, Reference<AsyncVar<Reference<ILogSystem>>> logSystem ) {
|
||||
ACTOR Future<Void> recoverTagFromLogSystem( TLogData* self, Reference<LogData> logData, Version beginVersion, Version endVersion, Tag tag, Reference<AsyncVar<int>> uncommittedBytes, Reference<AsyncVar<Reference<ILogSystem>>> logSystem, int taskID ) {
|
||||
state Future<Void> dbInfoChange = Void();
|
||||
state Reference<ILogSystem::IPeekCursor> r;
|
||||
state Version tagAt = beginVersion;
|
||||
|
@ -1513,7 +1513,7 @@ ACTOR Future<Void> recoverTagFromLogSystem( TLogData* self, Reference<LogData> l
|
|||
while (tagAt <= endVersion) {
|
||||
loop {
|
||||
choose {
|
||||
when(Void _ = wait( r ? r->getMore() : Never() ) ) {
|
||||
when(Void _ = wait( r ? r->getMore(taskID) : Never() ) ) {
|
||||
break;
|
||||
}
|
||||
when( Void _ = wait( dbInfoChange ) ) {
|
||||
|
@ -1603,7 +1603,7 @@ ACTOR Future<Void> recoverFromLogSystem( TLogData* self, Reference<LogData> logD
|
|||
state Future<Void> updater = updateLogSystem(self, logData, recoverFrom, logSystem);
|
||||
|
||||
for(auto tag : recoverTags )
|
||||
recoverFutures.push_back(recoverTagFromLogSystem(self, logData, knownCommittedVersion, recoverAt, tag, uncommittedBytes, logSystem));
|
||||
recoverFutures.push_back(recoverTagFromLogSystem(self, logData, knownCommittedVersion, recoverAt, tag, uncommittedBytes, logSystem, TaskTLogPeekReply));
|
||||
|
||||
state Future<Void> copyDone = waitForAll(recoverFutures);
|
||||
state Future<Void> recoveryDone = Never();
|
||||
|
@ -1615,7 +1615,7 @@ ACTOR Future<Void> recoverFromLogSystem( TLogData* self, Reference<LogData> logD
|
|||
when(Void _ = wait(copyDone)) {
|
||||
recoverFutures.clear();
|
||||
for(auto tag : recoverTags )
|
||||
recoverFutures.push_back(recoverTagFromLogSystem(self, logData, 0, knownCommittedVersion, tag, uncommittedBytes, logSystem));
|
||||
recoverFutures.push_back(recoverTagFromLogSystem(self, logData, 0, knownCommittedVersion, tag, uncommittedBytes, logSystem, TaskBatchCopy));
|
||||
copyDone = Never();
|
||||
recoveryDone = waitForAll(recoverFutures);
|
||||
|
||||
|
|
|
@ -67,6 +67,7 @@ enum {
|
|||
TaskDataDistribution = 3500,
|
||||
TaskDiskWrite = 3010,
|
||||
TaskUpdateStorage = 3000,
|
||||
TaskBatchCopy = 2900,
|
||||
TaskLowPriority = 2000,
|
||||
|
||||
TaskMinPriority = 1000
|
||||
|
|
Loading…
Reference in New Issue