ported peek metrics to old tlog 6.0

This commit is contained in:
Evan Tschannen 2020-04-22 23:35:48 -07:00
parent 0a1b2a572f
commit 91fba9106d
1 changed files with 111 additions and 1 deletions

View File

@ -384,6 +384,43 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
struct PeekTrackerData {
std::map<int, Promise<std::pair<Version, bool>>> sequence_version;
double lastUpdate;
Tag tag;
double lastLogged;
int64_t totalPeeks;
int64_t replyBytes;
int64_t duplicatePeeks;
double queueTime;
double queueMax;
double blockTime;
double blockMax;
double workTime;
double workMax;
int64_t unblockedPeeks;
double idleTime;
double idleMax;
PeekTrackerData() : lastUpdate(0) {
resetMetrics();
}
void resetMetrics() {
lastLogged = now();
totalPeeks = 0;
replyBytes = 0;
duplicatePeeks = 0;
queueTime = 0;
queueMax = 0;
blockTime = 0;
blockMax = 0;
workTime = 0;
workMax = 0;
unblockedPeeks = 0;
idleTime = 0;
idleMax = 0;
}
};
std::map<UID, PeekTrackerData> peekTracker;
@ -1032,6 +1069,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
state BinaryWriter messages2(Unversioned());
state int sequence = -1;
state UID peekId;
state double queueStart = now();
if(req.sequence.present()) {
try {
@ -1042,6 +1080,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
}
auto& trackerData = logData->peekTracker[peekId];
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
trackerData.tag = req.tag;
trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled));
}
auto seqBegin = trackerData.sequence_version.begin();
@ -1057,8 +1096,16 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
throw timed_out();
}
Future<std::pair<Version, bool>> fPrevPeekData = trackerData.sequence_version[sequence].getFuture();
if(fPrevPeekData.isReady()) {
trackerData.unblockedPeeks++;
double t = now() - trackerData.lastUpdate;
if(t > trackerData.idleMax) trackerData.idleMax = t;
trackerData.idleTime += t;
}
trackerData.lastUpdate = now();
std::pair<Version, bool> prevPeekData = wait(trackerData.sequence_version[sequence].getFuture());
std::pair<Version, bool> prevPeekData = wait(fPrevPeekData);
req.begin = prevPeekData.first;
req.onlySpilled = prevPeekData.second;
wait(yield());
@ -1072,6 +1119,8 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
}
}
state double blockStart = now();
if( req.returnIfBlocked && logData->version.get() < req.begin ) {
req.reply.sendError(end_of_stream());
if(req.sequence.present()) {
@ -1106,6 +1155,8 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
wait(delay(0, TaskPriority::TLogSpilledPeekReply));
}
state double workStart = now();
Version poppedVer = poppedVersion(logData, req.tag);
if(poppedVer > req.begin) {
TLogPeekReply rep;
@ -1194,6 +1245,22 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
if(req.sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
trackerData.lastUpdate = now();
double queueT = blockStart-queueStart;
double blockT = workStart-blockStart;
double workT = now()-workStart;
trackerData.totalPeeks++;
trackerData.replyBytes += reply.messages.size();
if(queueT > trackerData.queueMax) trackerData.queueMax = queueT;
if(blockT > trackerData.blockMax) trackerData.blockMax = blockT;
if(workT > trackerData.workMax) trackerData.workMax = workT;
trackerData.queueTime += queueT;
trackerData.blockTime += blockT;
trackerData.workTime += workT;
auto& sequenceData = trackerData.sequence_version[sequence+1];
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(timed_out());
@ -1202,6 +1269,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
return Void();
}
if(sequenceData.isSet()) {
trackerData.duplicatePeeks++;
if(sequenceData.getFuture().get().first != reply.end) {
TEST(true); //tlog peek second attempt ended at a different version
req.reply.sendError(timed_out());
@ -1537,6 +1605,47 @@ ACTOR Future<Void> cleanupPeekTrackers( LogData* logData ) {
}
}
ACTOR Future<Void> logPeekTrackers( LogData* logData ) {
loop {
int64_t logThreshold = 1;
if(logData->peekTracker.size() > SERVER_KNOBS->PEEK_LOGGING_AMOUNT) {
std::vector<int64_t> peekCounts;
peekCounts.reserve(logData->peekTracker.size());
for( auto& it : logData->peekTracker ) {
peekCounts.push_back(it.second.totalPeeks);
}
size_t pivot = peekCounts.size()-SERVER_KNOBS->PEEK_LOGGING_AMOUNT;
std::nth_element(peekCounts.begin(), peekCounts.begin()+pivot, peekCounts.end());
logThreshold = std::max<int64_t>(1,peekCounts[pivot]);
}
int logCount = 0;
for( auto& it : logData->peekTracker ) {
if(it.second.totalPeeks >= logThreshold) {
logCount++;
TraceEvent("PeekMetrics", logData->logId)
.detail("Tag", it.second.tag.toString())
.detail("Elapsed", now() - it.second.lastLogged)
.detail("MeanReplyBytes", it.second.replyBytes/it.second.totalPeeks)
.detail("TotalPeeks", it.second.totalPeeks)
.detail("UnblockedPeeks", it.second.unblockedPeeks)
.detail("DuplicatePeeks", it.second.duplicatePeeks)
.detail("Sequence", it.second.sequence_version.size() ? it.second.sequence_version.begin()->first : -1)
.detail("IdleSeconds", it.second.idleTime)
.detail("IdleMax", it.second.idleMax)
.detail("QueueSeconds", it.second.queueTime)
.detail("QueueMax", it.second.queueMax)
.detail("BlockSeconds", it.second.blockTime)
.detail("BlockMax", it.second.blockMax)
.detail("WorkSeconds", it.second.workTime)
.detail("WorkMax", it.second.workMax);
it.second.resetMetrics();
}
}
wait( delay(SERVER_KNOBS->PEEK_LOGGING_DELAY * std::max(1,logCount)) );
}
}
void getQueuingMetrics( TLogData* self, Reference<LogData> logData, TLogQueuingMetricsRequest const& req ) {
TLogQueuingMetricsReply reply;
reply.localTime = now();
@ -1876,6 +1985,7 @@ ACTOR Future<Void> tLogCore( TLogData* self, Reference<LogData> logData, TLogInt
logData->addActor.send( traceCounters("TLogMetrics", logData->logId, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &logData->cc, logData->logId.toString() + "/TLogMetrics"));
logData->addActor.send( serveTLogInterface(self, tli, logData, warningCollectorInput) );
logData->addActor.send( cleanupPeekTrackers(logData.getPtr()) );
logData->addActor.send( logPeekTrackers(logData.getPtr()) );
if(!logData->isPrimary) {
std::vector<Tag> tags;