fixed even more bugs

This commit is contained in:
Evan Tschannen 2017-07-15 15:15:03 -07:00
parent 5ac4de8775
commit 5852a6301b
5 changed files with 53 additions and 10 deletions

View File

@ -341,7 +341,9 @@ ACTOR Future<Void> logRouterPop( LogRouterData* self, TLogPopRequest req ) {
Void _ = wait(yield(TaskUpdateStorage));
}
if(self->logSystem->get()) {
self->logSystem->get()->pop(minPopped, self->routerTag);
}
req.reply.send(Void());
return Void();
}

View File

@ -189,6 +189,9 @@ struct ILogSystem {
// (3) the cursor cannot return any more results
virtual bool isActive() = 0;
//returns true if the cursor cannot return any more results
virtual bool isExhausted() = 0;
// Returns the smallest possible message version which the current message (if any) or a subsequent message might have
// (If hasMessage(), this is therefore the message version of the current message)
virtual LogMessageVersion version() = 0;
@ -252,6 +255,8 @@ struct ILogSystem {
virtual bool isActive();
virtual bool isExhausted();
virtual LogMessageVersion version();
virtual Version popped();
@ -313,6 +318,8 @@ struct ILogSystem {
virtual bool isActive();
virtual bool isExhausted();
virtual LogMessageVersion version();
virtual Version popped();
@ -369,6 +376,8 @@ struct ILogSystem {
virtual bool isActive();
virtual bool isExhausted();
virtual LogMessageVersion version();
virtual Version popped();
@ -413,6 +422,8 @@ struct ILogSystem {
virtual bool isActive();
virtual bool isExhausted();
virtual LogMessageVersion version();
virtual Version popped();

View File

@ -233,6 +233,10 @@ bool ILogSystem::ServerPeekCursor::isActive() {
return IFailureMonitor::failureMonitor().getState( interf->get().interf().peekMessages.getEndpoint() ).isAvailable();
}
bool ILogSystem::ServerPeekCursor::isExhausted() {
return messageVersion >= end;
}
LogMessageVersion ILogSystem::ServerPeekCursor::version() { return messageVersion; } // Call only after nextMessage(). The sequence of the current message, or results.end if nextMessage() has returned false.
Version ILogSystem::ServerPeekCursor::popped() { return poppedVersion; }
@ -356,6 +360,10 @@ void ILogSystem::MergedPeekCursor::advanceTo(LogMessageVersion n) {
}
ACTOR Future<Void> mergedPeekGetMore(ILogSystem::MergedPeekCursor* self, LogMessageVersion startVersion) {
if(self->bestServer >= 0 && self->serverCursors[self->bestServer]->isExhausted()) {
return Never();
}
loop {
//TraceEvent("MPC_getMoreA", self->randomID).detail("start", startVersion.toString());
if(self->bestServer >= 0 && self->serverCursors[self->bestServer]->isActive()) {
@ -402,6 +410,11 @@ bool ILogSystem::MergedPeekCursor::isActive() {
return false;
}
bool ILogSystem::MergedPeekCursor::isExhausted() {
ASSERT(false);
return false;
}
LogMessageVersion ILogSystem::MergedPeekCursor::version() { return messageVersion; }
Version ILogSystem::MergedPeekCursor::popped() {
@ -469,8 +482,8 @@ void ILogSystem::SetPeekCursor::calcHasMessage() {
}
}
if(useBestSet) {
hasNextMessage = false;
if(useBestSet) {
updateMessage(bestSet, false); // Use Quorum logic
if(!hasNextMessage) {
@ -575,12 +588,17 @@ void ILogSystem::SetPeekCursor::advanceTo(LogMessageVersion n) {
ACTOR Future<Void> setPeekGetMore(ILogSystem::SetPeekCursor* self, LogMessageVersion startVersion) {
loop {
//TraceEvent("LPC_getMoreA", self->randomID).detail("start", startVersion.toString());
if(self->bestServer >= 0 && self->bestSet >= 0 && self->serverCursors[self->bestSet][self->bestServer]->isExhausted()) {
return Never();
}
//TraceEvent("LPC_getMore1", self->randomID).detail("start", startVersion.toString()).detail("t", self->tag);
if(self->bestServer >= 0 && self->bestSet >= 0 && self->serverCursors[self->bestSet][self->bestServer]->isActive()) {
ASSERT(!self->serverCursors[self->bestSet][self->bestServer]->hasMessage());
Void _ = wait( self->serverCursors[self->bestSet][self->bestServer]->getMore() || self->serverCursors[self->bestSet][self->bestServer]->onFailed() );
self->useBestSet = true;
} else {
//FIXME: if best set is exhausted, do not peek remote servers
bool bestSetValid = self->bestSet >= 0;
if(bestSetValid) {
self->localityGroup.clear();
@ -645,6 +663,11 @@ bool ILogSystem::SetPeekCursor::isActive() {
return false;
}
bool ILogSystem::SetPeekCursor::isExhausted() {
ASSERT(false);
return false;
}
LogMessageVersion ILogSystem::SetPeekCursor::version() { return messageVersion; }
Version ILogSystem::SetPeekCursor::popped() {
@ -717,6 +740,10 @@ bool ILogSystem::MultiCursor::isActive() {
return cursors.back()->isActive();
}
bool ILogSystem::MultiCursor::isExhausted() {
return cursors.back()->isActive();
}
LogMessageVersion ILogSystem::MultiCursor::version() {
return cursors.back()->version();
}

View File

@ -729,7 +729,9 @@ ACTOR Future<Void> commitBatch(
// txnState (transaction subsystem state) tag: message extracted from log adapter
bool firstMessage = true;
for(auto m : msg.messages) {
if(firstMessage) {
toCommit.addTag(txsTag);
}
toCommit.addMessage(StringRef(m.begin(), m.size()), !firstMessage);
firstMessage = false;
}

View File

@ -853,7 +853,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
}
}
ACTOR static Future<Void> newRemoteEpoch( TagPartitionedLogSystem* self, vector<WorkerInterface> remoteTLogWorkers, vector<WorkerInterface> logRouterWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, Version recoveryVersion, Tag minTag, int logNum )
ACTOR static Future<Void> newRemoteEpoch( TagPartitionedLogSystem* self, Reference<TagPartitionedLogSystem> oldLogSystem, vector<WorkerInterface> remoteTLogWorkers, vector<WorkerInterface> logRouterWorkers, DatabaseConfiguration configuration, LogEpoch recoveryCount, Tag minTag, int logNum )
{
//recruit temporary log routers and update registration with them
state int tempLogRouters = std::max<int>(logRouterWorkers.size(), SERVER_KNOBS->MIN_TAG - minTag + 1);
@ -883,8 +883,9 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
InitializeTLogRequest &req = remoteTLogReqs[i];
req.recruitmentID = remoteRecruitmentID;
req.storeType = configuration.tLogDataStoreType;
req.recoverFrom = self->getLogSystemConfig();
req.recoverAt = recoveryVersion;
req.recoverFrom = oldLogSystem->getLogSystemConfig();
req.recoverAt = oldLogSystem->epochEndVersion.get();
req.knownCommittedVersion = oldLogSystem->knownCommittedVersion;
req.epoch = recoveryCount;
req.remoteTag = SERVER_KNOBS->MAX_TAG + i;
}
@ -894,7 +895,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
self->tLogs[logNum]->updateLocalitySet(remoteTLogWorkers);
vector<int> locations;
for( Tag tag : self->epochEndTags ) {
for( Tag tag : oldLogSystem->epochEndTags ) {
locations.clear();
self->tLogs[logNum]->getPushLocations( vector<Tag>(1, tag), locations, 0 );
for(int loc : locations)
@ -997,7 +998,7 @@ struct TagPartitionedLogSystem : ILogSystem, ReferenceCounted<TagPartitionedLogS
for( int i = 0; i < logSystem->tLogs[0]->logServers.size(); i++)
recoveryComplete.push_back( transformErrors( throwErrorOr( logSystem->tLogs[0]->logServers[i]->get().interf().recoveryFinished.getReplyUnlessFailedFor( TLogRecoveryFinishedRequest(), SERVER_KNOBS->TLOG_TIMEOUT, SERVER_KNOBS->MASTER_FAILURE_SLOPE_DURING_RECOVERY ) ), master_recovery_failed() ) );
logSystem->recoveryComplete = waitForAll(recoveryComplete);
logSystem->remoteRecovery = TagPartitionedLogSystem::newRemoteEpoch(logSystem.getPtr(), remoteTLogWorkers, logRouterWorkers, configuration, recoveryCount, oldLogSystem->epochEndVersion.get(), minTag, 1);
logSystem->remoteRecovery = TagPartitionedLogSystem::newRemoteEpoch(logSystem.getPtr(), oldLogSystem, remoteTLogWorkers, logRouterWorkers, configuration, recoveryCount, minTag, 1);
Void _ = wait(logSystem->remoteRecovery);
return logSystem;