Merge pull request #1818 from alexmiller-apple/peek-cursor-timeout-bug

Fix parallel peek stalling for 10min when a TLog generation is destroyed
This commit is contained in:
Evan Tschannen 2019-07-19 16:39:31 -07:00 committed by GitHub
commit 6d694cc2ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 114 additions and 71 deletions

View File

@ -265,12 +265,6 @@ struct TLogData : NonCopyable {
int64_t overheadBytesInput;
int64_t overheadBytesDurable;
struct PeekTrackerData {
std::map<int, Promise<std::pair<Version, bool>>> sequence_version;
double lastUpdate;
};
std::map<UID, PeekTrackerData> peekTracker;
WorkerCache<TLogInterface> tlogCache;
PromiseStream<Future<Void>> sharedActors;
@ -384,6 +378,13 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
Version queueCommittingVersion;
Version knownCommittedVersion, durableKnownCommittedVersion, minKnownCommittedVersion;
struct PeekTrackerData {
std::map<int, Promise<std::pair<Version, bool>>> sequence_version;
double lastUpdate;
};
std::map<UID, PeekTrackerData> peekTracker;
Deque<std::pair<Version, Standalone<VectorRef<uint8_t>>>> messageBlocks;
std::vector<std::vector<Reference<TagData>>> tag_data; //tag.locality | tag.id
int unpoppedRecoveredTags;
@ -491,6 +492,14 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
Key poppedKey = logIdKey.withPrefix(persistTagPoppedKeys.begin);
tLogData->persistentData->clear( KeyRangeRef( poppedKey, strinc(poppedKey) ) );
}
for ( auto it = peekTracker.begin(); it != peekTracker.end(); ++it ) {
for(auto seq : it->second.sequence_version) {
if(!seq.second.isSet()) {
seq.second.sendError(timed_out());
}
}
}
}
LogEpoch epoch() const { return recoveryCount; }
@ -1020,27 +1029,28 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
try {
peekId = req.sequence.get().first;
sequence = req.sequence.get().second;
if(sequence > 0) {
auto& trackerData = self->peekTracker[peekId];
auto seqBegin = trackerData.sequence_version.begin();
while(trackerData.sequence_version.size() && seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
if(seqBegin->second.canBeSet()) {
seqBegin->second.sendError(timed_out());
}
trackerData.sequence_version.erase(seqBegin);
seqBegin = trackerData.sequence_version.begin();
}
if(trackerData.sequence_version.size() && sequence < seqBegin->first) {
throw timed_out();
}
trackerData.lastUpdate = now();
std::pair<Version, bool> prevPeekData = wait(trackerData.sequence_version[sequence].getFuture());
req.begin = prevPeekData.first;
req.onlySpilled = prevPeekData.second;
wait(yield());
auto& trackerData = logData->peekTracker[peekId];
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled));
}
auto seqBegin = trackerData.sequence_version.begin();
while(trackerData.sequence_version.size() && seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
if(seqBegin->second.canBeSet()) {
seqBegin->second.sendError(timed_out());
}
trackerData.sequence_version.erase(seqBegin);
seqBegin = trackerData.sequence_version.begin();
}
if(trackerData.sequence_version.size() && sequence < seqBegin->first) {
throw timed_out();
}
trackerData.lastUpdate = now();
std::pair<Version, bool> prevPeekData = wait(trackerData.sequence_version[sequence].getFuture());
req.begin = prevPeekData.first;
req.onlySpilled = prevPeekData.second;
wait(yield());
} catch( Error &e ) {
if(e.code() == error_code_timed_out) {
req.reply.sendError(timed_out());
@ -1053,6 +1063,13 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
if( req.returnIfBlocked && logData->version.get() < req.begin ) {
req.reply.sendError(end_of_stream());
if(req.sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence+1];
if (!sequenceData.isSet()) {
sequenceData.send(std::make_pair(req.begin, req.onlySpilled));
}
}
return Void();
}
@ -1088,13 +1105,15 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
rep.onlySpilled = false;
if(req.sequence.present()) {
auto& trackerData = self->peekTracker[peekId];
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence+1];
trackerData.lastUpdate = now();
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(timed_out());
if (!sequenceData.isSet())
sequenceData.sendError(timed_out());
return Void();
}
auto& sequenceData = trackerData.sequence_version[sequence+1];
if(sequenceData.isSet()) {
if(sequenceData.getFuture().get().first != rep.end) {
TEST(true); //tlog peek second attempt ended at a different version
@ -1162,13 +1181,15 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().address);
if(req.sequence.present()) {
auto& trackerData = self->peekTracker[peekId];
auto& trackerData = logData->peekTracker[peekId];
trackerData.lastUpdate = now();
auto& sequenceData = trackerData.sequence_version[sequence+1];
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(timed_out());
if(!sequenceData.isSet())
sequenceData.sendError(timed_out());
return Void();
}
auto& sequenceData = trackerData.sequence_version[sequence+1];
if(sequenceData.isSet()) {
if(sequenceData.getFuture().get().first != reply.end) {
TEST(true); //tlog peek second attempt ended at a different version
@ -1546,7 +1567,7 @@ ACTOR Future<Void> tLogCommit(
}
state Version execVersion = invalidVersion;
state ExecCmdValueString execArg();
state ExecCmdValueString execArg;
state TLogQueueEntryRef qe;
state StringRef execCmd;
state Standalone<VectorRef<Tag>> execTags;
@ -1740,11 +1761,11 @@ ACTOR Future<Void> respondToRecovered( TLogInterface tli, Promise<Void> recovery
}
}
ACTOR Future<Void> cleanupPeekTrackers( TLogData* self ) {
ACTOR Future<Void> cleanupPeekTrackers( LogData* logData ) {
loop {
double minTimeUntilExpiration = SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME;
auto it = self->peekTracker.begin();
while(it != self->peekTracker.end()) {
auto it = logData->peekTracker.begin();
while(it != logData->peekTracker.end()) {
double timeUntilExpiration = it->second.lastUpdate + SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME - now();
if(timeUntilExpiration < 1.0e-6) {
for(auto seq : it->second.sequence_version) {
@ -1752,7 +1773,7 @@ ACTOR Future<Void> cleanupPeekTrackers( TLogData* self ) {
seq.second.sendError(timed_out());
}
}
it = self->peekTracker.erase(it);
it = logData->peekTracker.erase(it);
} else {
minTimeUntilExpiration = std::min(minTimeUntilExpiration, timeUntilExpiration);
++it;
@ -2003,6 +2024,7 @@ ACTOR Future<Void> tLogCore( TLogData* self, Reference<LogData> logData, TLogInt
//FIXME: update tlogMetrics to include new information, or possibly only have one copy for the shared instance
logData->addActor.send( traceCounters("TLogMetrics", logData->logId, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &logData->cc, logData->logId.toString() + "/TLogMetrics"));
logData->addActor.send( serveTLogInterface(self, tli, logData, warningCollectorInput) );
logData->addActor.send( cleanupPeekTrackers(logData.getPtr()) );
if(!logData->isPrimary) {
std::vector<Tag> tags;
@ -2460,7 +2482,6 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
if(recovered.canBeSet()) recovered.send(Void());
self.sharedActors.send( cleanupPeekTrackers(&self) );
self.sharedActors.send( commitQueue(&self) );
self.sharedActors.send( updateStorageLoop(&self) );

View File

@ -315,12 +315,6 @@ struct TLogData : NonCopyable {
int64_t overheadBytesInput;
int64_t overheadBytesDurable;
struct PeekTrackerData {
std::map<int, Promise<std::pair<Version, bool>>> sequence_version;
double lastUpdate;
};
std::map<UID, PeekTrackerData> peekTracker;
WorkerCache<TLogInterface> tlogCache;
FlowLock peekMemoryLimiter;
@ -481,6 +475,13 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
Promise<Void> recoveryComplete, committingQueue;
Version unrecoveredBefore, recoveredAt;
struct PeekTrackerData {
std::map<int, Promise<std::pair<Version, bool>>> sequence_version;
double lastUpdate;
};
std::map<UID, PeekTrackerData> peekTracker;
Reference<AsyncVar<Reference<ILogSystem>>> logSystem;
Tag remoteTag;
bool isPrimary;
@ -555,6 +556,14 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
Key poppedKey = logIdKey.withPrefix(persistTagPoppedKeys.begin);
tLogData->persistentData->clear( KeyRangeRef( poppedKey, strinc(poppedKey) ) );
}
for ( auto it = peekTracker.begin(); it != peekTracker.end(); ++it ) {
for(auto seq : it->second.sequence_version) {
if(!seq.second.isSet()) {
seq.second.sendError(timed_out());
}
}
}
}
LogEpoch epoch() const { return recoveryCount; }
@ -1307,27 +1316,29 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
try {
peekId = req.sequence.get().first;
sequence = req.sequence.get().second;
if(sequence > 0) {
auto& trackerData = self->peekTracker[peekId];
auto seqBegin = trackerData.sequence_version.begin();
while(trackerData.sequence_version.size() && seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
if(seqBegin->second.canBeSet()) {
seqBegin->second.sendError(timed_out());
}
trackerData.sequence_version.erase(seqBegin);
seqBegin = trackerData.sequence_version.begin();
}
if(trackerData.sequence_version.size() && sequence < seqBegin->first) {
throw timed_out();
}
trackerData.lastUpdate = now();
std::pair<Version, bool> prevPeekData = wait(trackerData.sequence_version[sequence].getFuture());
req.begin = prevPeekData.first;
req.onlySpilled = prevPeekData.second;
wait(yield());
auto& trackerData = logData->peekTracker[peekId];
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled));
}
auto seqBegin = trackerData.sequence_version.begin();
// The peek cursor and this comparison need to agree about the maximum number of in-flight requests.
while(trackerData.sequence_version.size() && seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
if(seqBegin->second.canBeSet()) {
seqBegin->second.sendError(timed_out());
}
trackerData.sequence_version.erase(seqBegin);
seqBegin = trackerData.sequence_version.begin();
}
if(trackerData.sequence_version.size() && sequence < seqBegin->first) {
throw timed_out();
}
trackerData.lastUpdate = now();
std::pair<Version, bool> prevPeekData = wait(trackerData.sequence_version[sequence].getFuture());
req.begin = prevPeekData.first;
req.onlySpilled = prevPeekData.second;
wait(yield());
} catch( Error &e ) {
if(e.code() == error_code_timed_out) {
req.reply.sendError(timed_out());
@ -1340,6 +1351,13 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
if( req.returnIfBlocked && logData->version.get() < req.begin ) {
req.reply.sendError(end_of_stream());
if(req.sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence+1];
if (!sequenceData.isSet()) {
sequenceData.send(std::make_pair(req.begin, req.onlySpilled));
}
}
return Void();
}
@ -1375,13 +1393,15 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
rep.onlySpilled = false;
if(req.sequence.present()) {
auto& trackerData = self->peekTracker[peekId];
auto& trackerData = logData->peekTracker[peekId];
auto& sequenceData = trackerData.sequence_version[sequence+1];
trackerData.lastUpdate = now();
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(timed_out());
if (!sequenceData.isSet())
sequenceData.sendError(timed_out());
return Void();
}
auto& sequenceData = trackerData.sequence_version[sequence+1];
if(sequenceData.isSet()) {
if(sequenceData.getFuture().get().first != rep.end) {
TEST(true); //tlog peek second attempt ended at a different version
@ -1536,13 +1556,15 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().getPrimaryAddress());
if(req.sequence.present()) {
auto& trackerData = self->peekTracker[peekId];
auto& trackerData = logData->peekTracker[peekId];
trackerData.lastUpdate = now();
auto& sequenceData = trackerData.sequence_version[sequence+1];
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(timed_out());
if(!sequenceData.isSet())
sequenceData.sendError(timed_out());
return Void();
}
auto& sequenceData = trackerData.sequence_version[sequence+1];
if(sequenceData.isSet()) {
if(sequenceData.getFuture().get().first != reply.end) {
TEST(true); //tlog peek second attempt ended at a different version
@ -2117,11 +2139,11 @@ ACTOR Future<Void> respondToRecovered( TLogInterface tli, Promise<Void> recovery
}
}
ACTOR Future<Void> cleanupPeekTrackers( TLogData* self ) {
ACTOR Future<Void> cleanupPeekTrackers( LogData* logData ) {
loop {
double minTimeUntilExpiration = SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME;
auto it = self->peekTracker.begin();
while(it != self->peekTracker.end()) {
auto it = logData->peekTracker.begin();
while(it != logData->peekTracker.end()) {
double timeUntilExpiration = it->second.lastUpdate + SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME - now();
if(timeUntilExpiration < 1.0e-6) {
for(auto seq : it->second.sequence_version) {
@ -2129,7 +2151,7 @@ ACTOR Future<Void> cleanupPeekTrackers( TLogData* self ) {
seq.second.sendError(timed_out());
}
}
it = self->peekTracker.erase(it);
it = logData->peekTracker.erase(it);
} else {
minTimeUntilExpiration = std::min(minTimeUntilExpiration, timeUntilExpiration);
++it;
@ -2388,6 +2410,7 @@ ACTOR Future<Void> tLogCore( TLogData* self, Reference<LogData> logData, TLogInt
//FIXME: update tlogMetrics to include new information, or possibly only have one copy for the shared instance
logData->addActor.send( traceCounters("TLogMetrics", logData->logId, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &logData->cc, logData->logId.toString() + "/TLogMetrics"));
logData->addActor.send( serveTLogInterface(self, tli, logData, warningCollectorInput) );
logData->addActor.send( cleanupPeekTrackers(logData.getPtr()) );
if(!logData->isPrimary) {
std::vector<Tag> tags;
@ -2869,7 +2892,6 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
if(recovered.canBeSet()) recovered.send(Void());
self.sharedActors.send( cleanupPeekTrackers(&self) );
self.sharedActors.send( commitQueue(&self) );
self.sharedActors.send( updateStorageLoop(&self) );