Copy the same set of changes to OldTLogServer_6_0

This commit is contained in:
Alex Miller 2019-07-09 18:20:20 -07:00
parent fd769ad878
commit 2c7007d662
1 changed files with 23 additions and 14 deletions

View File

@ -264,12 +264,6 @@ struct TLogData : NonCopyable {
int64_t overheadBytesInput;
int64_t overheadBytesDurable;
struct PeekTrackerData {
std::map<int, Promise<Version>> sequence_version;
double lastUpdate;
};
std::map<UID, PeekTrackerData> peekTracker;
WorkerCache<TLogInterface> tlogCache;
PromiseStream<Future<Void>> sharedActors;
@ -383,6 +377,13 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
Version queueCommittingVersion;
Version knownCommittedVersion, durableKnownCommittedVersion, minKnownCommittedVersion;
struct PeekTrackerData {
std::map<int, Promise<Version>> sequence_version;
double lastUpdate;
};
std::map<UID, PeekTrackerData> peekTracker;
Deque<std::pair<Version, Standalone<VectorRef<uint8_t>>>> messageBlocks;
std::vector<std::vector<Reference<TagData>>> tag_data; //tag.locality | tag.id
int unpoppedRecoveredTags;
@ -488,6 +489,14 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
Key poppedKey = logIdKey.withPrefix(persistTagPoppedKeys.begin);
tLogData->persistentData->clear( KeyRangeRef( poppedKey, strinc(poppedKey) ) );
}
for ( auto it = peekTracker.begin(); it != peekTracker.end(); ++it ) {
for(auto seq : it->second.sequence_version) {
if(!seq.second.isSet()) {
seq.second.sendError(timed_out());
}
}
}
}
LogEpoch epoch() const { return recoveryCount; }
@ -1015,7 +1024,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
peekId = req.sequence.get().first;
sequence = req.sequence.get().second;
if(sequence > 0) {
auto& trackerData = self->peekTracker[peekId];
auto& trackerData = logData->peekTracker[peekId];
auto seqBegin = trackerData.sequence_version.begin();
while(trackerData.sequence_version.size() && seqBegin->first <= sequence - SERVER_KNOBS->PARALLEL_GET_MORE_REQUESTS) {
if(seqBegin->second.canBeSet()) {
@ -1081,7 +1090,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
rep.onlySpilled = false;
if(req.sequence.present()) {
auto& trackerData = self->peekTracker[peekId];
auto& trackerData = logData->peekTracker[peekId];
trackerData.lastUpdate = now();
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(timed_out());
@ -1155,7 +1164,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
//TraceEvent("TlogPeek", self->dbgid).detail("LogId", logData->logId).detail("EndVer", reply.end).detail("MsgBytes", reply.messages.expectedSize()).detail("ForAddress", req.reply.getEndpoint().address);
if(req.sequence.present()) {
auto& trackerData = self->peekTracker[peekId];
auto& trackerData = logData->peekTracker[peekId];
trackerData.lastUpdate = now();
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(timed_out());
@ -1732,11 +1741,11 @@ ACTOR Future<Void> respondToRecovered( TLogInterface tli, Promise<Void> recovery
}
}
ACTOR Future<Void> cleanupPeekTrackers( TLogData* self ) {
ACTOR Future<Void> cleanupPeekTrackers( LogData* logData ) {
loop {
double minTimeUntilExpiration = SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME;
auto it = self->peekTracker.begin();
while(it != self->peekTracker.end()) {
auto it = logData->peekTracker.begin();
while(it != logData->peekTracker.end()) {
double timeUntilExpiration = it->second.lastUpdate + SERVER_KNOBS->PEEK_TRACKER_EXPIRATION_TIME - now();
if(timeUntilExpiration < 1.0e-6) {
for(auto seq : it->second.sequence_version) {
@ -1744,7 +1753,7 @@ ACTOR Future<Void> cleanupPeekTrackers( TLogData* self ) {
seq.second.sendError(timed_out());
}
}
it = self->peekTracker.erase(it);
it = logData->peekTracker.erase(it);
} else {
minTimeUntilExpiration = std::min(minTimeUntilExpiration, timeUntilExpiration);
++it;
@ -1995,6 +2004,7 @@ ACTOR Future<Void> tLogCore( TLogData* self, Reference<LogData> logData, TLogInt
//FIXME: update tlogMetrics to include new information, or possibly only have one copy for the shared instance
logData->addActor.send( traceCounters("TLogMetrics", logData->logId, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &logData->cc, logData->logId.toString() + "/TLogMetrics"));
logData->addActor.send( serveTLogInterface(self, tli, logData, warningCollectorInput) );
logData->addActor.send( cleanupPeekTrackers(logData.getPtr()) );
if(!logData->isPrimary) {
std::vector<Tag> tags;
@ -2446,7 +2456,6 @@ ACTOR Future<Void> tLog( IKeyValueStore* persistentData, IDiskQueue* persistentQ
if(recovered.canBeSet()) recovered.send(Void());
self.sharedActors.send( cleanupPeekTrackers(&self) );
self.sharedActors.send( commitQueue(&self) );
self.sharedActors.send( updateStorageLoop(&self) );