Merge pull request #3008 from etschannen/release-6.2

Added logging for parallel peeks from TLogs
This commit is contained in:
Evan Tschannen 2020-04-23 10:27:49 -07:00 committed by GitHub
commit 76d4570d5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 240 additions and 12 deletions

View File

@ -242,6 +242,7 @@ struct KeyRangeRef {
force_inline void serialize(Ar& ar) {
serializer(ar, const_cast<KeyRef&>(begin), const_cast<KeyRef&>(end));
if( begin > end ) {
TraceEvent("InvertedRange").detail("Begin", begin).detail("End", end);
throw inverted_range();
};
}

View File

@ -167,8 +167,8 @@ ACTOR Future<Void> failureMonitorClientLoop(
}
ACTOR Future<Void> failureMonitorClient( Reference<AsyncVar<Optional<struct ClusterInterface>>> ci, bool trackMyStatus ) {
TraceEvent("FailureMonitorStart").detail("IsClient", FlowTransport::transport().isClient());
if (FlowTransport::transport().isClient()) {
TraceEvent("FailureMonitorStart").detail("IsClient", FlowTransport::isClient());
if (FlowTransport::isClient()) {
wait(Never());
}

View File

@ -295,7 +295,7 @@ static ReliablePacket* sendPacket( TransportData* self, Reference<Peer> peer, IS
ACTOR Future<Void> connectionMonitor( Reference<Peer> peer ) {
state Endpoint remotePingEndpoint({ peer->destination }, WLTOKEN_PING_PACKET);
loop {
if (!FlowTransport::transport().isClient() && !peer->destination.isPublic() && peer->compatible) {
if (!FlowTransport::isClient() && !peer->destination.isPublic() && peer->compatible) {
// Don't send ping messages to clients unless necessary. Instead monitor incoming client pings.
// We ignore this block for incompatible clients because pings from server would trigger the
// peer->resetPing and prevent 'connection_failed' due to ping timeout.
@ -324,7 +324,7 @@ ACTOR Future<Void> connectionMonitor( Reference<Peer> peer ) {
(peer->lastDataPacketSentTime < now() - FLOW_KNOBS->CONNECTION_MONITOR_UNREFERENCED_CLOSE_DELAY)) {
// TODO: What about when peerReference == -1?
throw connection_unreferenced();
} else if (FlowTransport::transport().isClient() && peer->compatible && peer->destination.isPublic() &&
} else if (FlowTransport::isClient() && peer->compatible && peer->destination.isPublic() &&
(peer->lastConnectTime < now() - FLOW_KNOBS->CONNECTION_MONITOR_IDLE_TIMEOUT) &&
(peer->lastDataPacketSentTime < now() - FLOW_KNOBS->CONNECTION_MONITOR_IDLE_TIMEOUT)) {
// First condition is necessary because we may get here if we are server.
@ -413,7 +413,7 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
self->outgoingConnectionIdle = true;
// Wait until there is something to send.
while (self->unsent.empty()) {
if (FlowTransport::transport().isClient() && self->destination.isPublic() &&
if (FlowTransport::isClient() && self->destination.isPublic() &&
clientReconnectDelay) {
break;
}
@ -434,7 +434,7 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
when( Reference<IConnection> _conn = wait( INetworkConnections::net()->connect(self->destination) ) ) {
conn = _conn;
wait(conn->connectHandshake());
if (FlowTransport::transport().isClient()) {
if (FlowTransport::isClient()) {
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(false));
}
if (self->unsent.empty()) {
@ -459,7 +459,7 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
throw;
}
TraceEvent("ConnectionTimedOut", conn ? conn->getDebugID() : UID()).suppressFor(1.0).detail("PeerAddr", self->destination);
if (FlowTransport::transport().isClient()) {
if (FlowTransport::isClient()) {
IFailureMonitor::failureMonitor().setStatus(self->destination, FailureStatus(true));
clientReconnectDelay = true;
}
@ -511,7 +511,7 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
if(self->destination.isPublic()
&& IFailureMonitor::failureMonitor().getState(self->destination).isAvailable()
&& !FlowTransport::transport().isClient())
&& !FlowTransport::isClient())
{
auto& it = self->transport->closedPeers[self->destination];
if(now() - it.second > FLOW_KNOBS->TOO_MANY_CONNECTIONS_CLOSED_RESET_DELAY) {
@ -526,7 +526,7 @@ ACTOR Future<Void> connectionKeeper( Reference<Peer> self,
}
if (conn) {
clientReconnectDelay = FlowTransport::transport().isClient() && e.code() != error_code_connection_idle;
clientReconnectDelay = FlowTransport::isClient() && e.code() != error_code_connection_idle;
conn->close();
conn = Reference<IConnection>();
}
@ -659,6 +659,9 @@ ACTOR static void deliver(TransportData* self, Endpoint destination, ArenaReader
} catch (Error& e) {
g_currentDeliveryPeerAddress = {NetworkAddress()};
TraceEvent(SevError, "ReceiverError").error(e).detail("Token", destination.token.toString()).detail("Peer", destination.getPrimaryAddress());
if(!FlowTransport::isClient()) {
flushAndExit(FDB_EXIT_ERROR);
}
throw;
}
} else if (destination.token.first() & TOKEN_STREAM_FLAG) {
@ -1107,7 +1110,7 @@ void FlowTransport::addPeerReference(const Endpoint& endpoint, bool isStream) {
Reference<Peer> peer = self->getOrOpenPeer(endpoint.getPrimaryAddress());
if(peer->peerReferences == -1) {
if (FlowTransport::transport().isClient()) {
if (FlowTransport::isClient()) {
IFailureMonitor::failureMonitor().setStatus(endpoint.getPrimaryAddress(), FailureStatus(false));
}
peer->peerReferences = 1;

View File

@ -83,6 +83,8 @@ ServerKnobs::ServerKnobs(bool randomize, ClientKnobs* clientKnobs, bool isSimula
init( TLOG_IGNORE_POP_AUTO_ENABLE_DELAY, 300.0 );
init( TXS_POPPED_MAX_DELAY, 1.0 ); if ( randomize && BUGGIFY ) TXS_POPPED_MAX_DELAY = deterministicRandom()->random01();
init( TLOG_MAX_CREATE_DURATION, 10.0 );
init( PEEK_LOGGING_AMOUNT, 5 );
init( PEEK_LOGGING_DELAY, 5.0 );
// disk snapshot max timeout, to be put in TLog, storage and coordinator nodes
init( SNAP_CREATE_MAX_TIMEOUT, 300.0 );

View File

@ -85,6 +85,8 @@ public:
double TLOG_DEGRADED_DURATION;
double TXS_POPPED_MAX_DELAY;
double TLOG_MAX_CREATE_DURATION;
int PEEK_LOGGING_AMOUNT;
double PEEK_LOGGING_DELAY;
// Data distribution queue
double HEALTH_POLL_TIME;

View File

@ -384,6 +384,43 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
struct PeekTrackerData {
std::map<int, Promise<std::pair<Version, bool>>> sequence_version;
double lastUpdate;
Tag tag;
double lastLogged;
int64_t totalPeeks;
int64_t replyBytes;
int64_t duplicatePeeks;
double queueTime;
double queueMax;
double blockTime;
double blockMax;
double workTime;
double workMax;
int64_t unblockedPeeks;
double idleTime;
double idleMax;
PeekTrackerData() : lastUpdate(0) {
resetMetrics();
}
void resetMetrics() {
lastLogged = now();
totalPeeks = 0;
replyBytes = 0;
duplicatePeeks = 0;
queueTime = 0;
queueMax = 0;
blockTime = 0;
blockMax = 0;
workTime = 0;
workMax = 0;
unblockedPeeks = 0;
idleTime = 0;
idleMax = 0;
}
};
std::map<UID, PeekTrackerData> peekTracker;
@ -1032,6 +1069,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
state BinaryWriter messages2(Unversioned());
state int sequence = -1;
state UID peekId;
state double queueStart = now();
if(req.sequence.present()) {
try {
@ -1042,6 +1080,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
}
auto& trackerData = logData->peekTracker[peekId];
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
trackerData.tag = req.tag;
trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled));
}
auto seqBegin = trackerData.sequence_version.begin();
@ -1057,8 +1096,16 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
throw timed_out();
}
Future<std::pair<Version, bool>> fPrevPeekData = trackerData.sequence_version[sequence].getFuture();
if(fPrevPeekData.isReady()) {
trackerData.unblockedPeeks++;
double t = now() - trackerData.lastUpdate;
if(t > trackerData.idleMax) trackerData.idleMax = t;
trackerData.idleTime += t;
}
trackerData.lastUpdate = now();
std::pair<Version, bool> prevPeekData = wait(trackerData.sequence_version[sequence].getFuture());
std::pair<Version, bool> prevPeekData = wait(fPrevPeekData);
req.begin = prevPeekData.first;
req.onlySpilled = prevPeekData.second;
wait(yield());
@ -1072,6 +1119,8 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
}
}
state double blockStart = now();
if( req.returnIfBlocked && logData->version.get() < req.begin ) {
req.reply.sendError(end_of_stream());
if(req.sequence.present()) {
@ -1106,6 +1155,8 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
wait(delay(0, TaskPriority::TLogSpilledPeekReply));
}
state double workStart = now();
Version poppedVer = poppedVersion(logData, req.tag);
if(poppedVer > req.begin) {
TLogPeekReply rep;
@ -1194,6 +1245,22 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
if(req.sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
trackerData.lastUpdate = now();
double queueT = blockStart-queueStart;
double blockT = workStart-blockStart;
double workT = now()-workStart;
trackerData.totalPeeks++;
trackerData.replyBytes += reply.messages.size();
if(queueT > trackerData.queueMax) trackerData.queueMax = queueT;
if(blockT > trackerData.blockMax) trackerData.blockMax = blockT;
if(workT > trackerData.workMax) trackerData.workMax = workT;
trackerData.queueTime += queueT;
trackerData.blockTime += blockT;
trackerData.workTime += workT;
auto& sequenceData = trackerData.sequence_version[sequence+1];
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(timed_out());
@ -1202,6 +1269,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
return Void();
}
if(sequenceData.isSet()) {
trackerData.duplicatePeeks++;
if(sequenceData.getFuture().get().first != reply.end) {
TEST(true); //tlog peek second attempt ended at a different version
req.reply.sendError(timed_out());
@ -1537,6 +1605,47 @@ ACTOR Future<Void> cleanupPeekTrackers( LogData* logData ) {
}
}
ACTOR Future<Void> logPeekTrackers( LogData* logData ) {
loop {
int64_t logThreshold = 1;
if(logData->peekTracker.size() > SERVER_KNOBS->PEEK_LOGGING_AMOUNT) {
std::vector<int64_t> peekCounts;
peekCounts.reserve(logData->peekTracker.size());
for( auto& it : logData->peekTracker ) {
peekCounts.push_back(it.second.totalPeeks);
}
size_t pivot = peekCounts.size()-SERVER_KNOBS->PEEK_LOGGING_AMOUNT;
std::nth_element(peekCounts.begin(), peekCounts.begin()+pivot, peekCounts.end());
logThreshold = std::max<int64_t>(1,peekCounts[pivot]);
}
int logCount = 0;
for( auto& it : logData->peekTracker ) {
if(it.second.totalPeeks >= logThreshold) {
logCount++;
TraceEvent("PeekMetrics", logData->logId)
.detail("Tag", it.second.tag.toString())
.detail("Elapsed", now() - it.second.lastLogged)
.detail("MeanReplyBytes", it.second.replyBytes/it.second.totalPeeks)
.detail("TotalPeeks", it.second.totalPeeks)
.detail("UnblockedPeeks", it.second.unblockedPeeks)
.detail("DuplicatePeeks", it.second.duplicatePeeks)
.detail("Sequence", it.second.sequence_version.size() ? it.second.sequence_version.begin()->first : -1)
.detail("IdleSeconds", it.second.idleTime)
.detail("IdleMax", it.second.idleMax)
.detail("QueueSeconds", it.second.queueTime)
.detail("QueueMax", it.second.queueMax)
.detail("BlockSeconds", it.second.blockTime)
.detail("BlockMax", it.second.blockMax)
.detail("WorkSeconds", it.second.workTime)
.detail("WorkMax", it.second.workMax);
it.second.resetMetrics();
}
}
wait( delay(SERVER_KNOBS->PEEK_LOGGING_DELAY * std::max(1,logCount)) );
}
}
void getQueuingMetrics( TLogData* self, Reference<LogData> logData, TLogQueuingMetricsRequest const& req ) {
TLogQueuingMetricsReply reply;
reply.localTime = now();
@ -1876,6 +1985,7 @@ ACTOR Future<Void> tLogCore( TLogData* self, Reference<LogData> logData, TLogInt
logData->addActor.send( traceCounters("TLogMetrics", logData->logId, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &logData->cc, logData->logId.toString() + "/TLogMetrics"));
logData->addActor.send( serveTLogInterface(self, tli, logData, warningCollectorInput) );
logData->addActor.send( cleanupPeekTrackers(logData.getPtr()) );
logData->addActor.send( logPeekTrackers(logData.getPtr()) );
if(!logData->isPrimary) {
std::vector<Tag> tags;

View File

@ -483,6 +483,43 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
struct PeekTrackerData {
std::map<int, Promise<std::pair<Version, bool>>> sequence_version;
double lastUpdate;
Tag tag;
double lastLogged;
int64_t totalPeeks;
int64_t replyBytes;
int64_t duplicatePeeks;
double queueTime;
double queueMax;
double blockTime;
double blockMax;
double workTime;
double workMax;
int64_t unblockedPeeks;
double idleTime;
double idleMax;
PeekTrackerData() : lastUpdate(0) {
resetMetrics();
}
void resetMetrics() {
lastLogged = now();
totalPeeks = 0;
replyBytes = 0;
duplicatePeeks = 0;
queueTime = 0;
queueMax = 0;
blockTime = 0;
blockMax = 0;
workTime = 0;
workMax = 0;
unblockedPeeks = 0;
idleTime = 0;
idleMax = 0;
}
};
std::map<UID, PeekTrackerData> peekTracker;
@ -1335,6 +1372,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
state BinaryWriter messages2(Unversioned());
state int sequence = -1;
state UID peekId;
state double queueStart = now();
if(req.sequence.present()) {
try {
@ -1345,6 +1383,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
}
auto& trackerData = logData->peekTracker[peekId];
if (sequence == 0 && trackerData.sequence_version.find(0) == trackerData.sequence_version.end()) {
trackerData.tag = req.tag;
trackerData.sequence_version[0].send(std::make_pair(req.begin, req.onlySpilled));
}
auto seqBegin = trackerData.sequence_version.begin();
@ -1361,8 +1400,15 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
throw timed_out();
}
Future<std::pair<Version, bool>> fPrevPeekData = trackerData.sequence_version[sequence].getFuture();
if(fPrevPeekData.isReady()) {
trackerData.unblockedPeeks++;
double t = now() - trackerData.lastUpdate;
if(t > trackerData.idleMax) trackerData.idleMax = t;
trackerData.idleTime += t;
}
trackerData.lastUpdate = now();
std::pair<Version, bool> prevPeekData = wait(trackerData.sequence_version[sequence].getFuture());
std::pair<Version, bool> prevPeekData = wait(fPrevPeekData);
req.begin = prevPeekData.first;
req.onlySpilled = prevPeekData.second;
wait(yield());
@ -1376,6 +1422,8 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
}
}
state double blockStart = now();
if( req.returnIfBlocked && logData->version.get() < req.begin ) {
req.reply.sendError(end_of_stream());
if(req.sequence.present()) {
@ -1410,6 +1458,8 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
wait(delay(0, TaskPriority::TLogSpilledPeekReply));
}
state double workStart = now();
Version poppedVer = poppedVersion(logData, req.tag);
if(poppedVer > req.begin) {
TLogPeekReply rep;
@ -1585,6 +1635,22 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
if(req.sequence.present()) {
auto& trackerData = logData->peekTracker[peekId];
trackerData.lastUpdate = now();
double queueT = blockStart-queueStart;
double blockT = workStart-blockStart;
double workT = now()-workStart;
trackerData.totalPeeks++;
trackerData.replyBytes += reply.messages.size();
if(queueT > trackerData.queueMax) trackerData.queueMax = queueT;
if(blockT > trackerData.blockMax) trackerData.blockMax = blockT;
if(workT > trackerData.workMax) trackerData.workMax = workT;
trackerData.queueTime += queueT;
trackerData.blockTime += blockT;
trackerData.workTime += workT;
auto& sequenceData = trackerData.sequence_version[sequence+1];
if(trackerData.sequence_version.size() && sequence+1 < trackerData.sequence_version.begin()->first) {
req.reply.sendError(timed_out());
@ -1593,6 +1659,7 @@ ACTOR Future<Void> tLogPeekMessages( TLogData* self, TLogPeekRequest req, Refere
return Void();
}
if(sequenceData.isSet()) {
trackerData.duplicatePeeks++;
if(sequenceData.getFuture().get().first != reply.end) {
TEST(true); //tlog peek second attempt ended at a different version
req.reply.sendError(timed_out());
@ -1929,6 +1996,47 @@ ACTOR Future<Void> cleanupPeekTrackers( LogData* logData ) {
}
}
ACTOR Future<Void> logPeekTrackers( LogData* logData ) {
loop {
int64_t logThreshold = 1;
if(logData->peekTracker.size() > SERVER_KNOBS->PEEK_LOGGING_AMOUNT) {
std::vector<int64_t> peekCounts;
peekCounts.reserve(logData->peekTracker.size());
for( auto& it : logData->peekTracker ) {
peekCounts.push_back(it.second.totalPeeks);
}
size_t pivot = peekCounts.size()-SERVER_KNOBS->PEEK_LOGGING_AMOUNT;
std::nth_element(peekCounts.begin(), peekCounts.begin()+pivot, peekCounts.end());
logThreshold = std::max<int64_t>(1,peekCounts[pivot]);
}
int logCount = 0;
for( auto& it : logData->peekTracker ) {
if(it.second.totalPeeks >= logThreshold) {
logCount++;
TraceEvent("PeekMetrics", logData->logId)
.detail("Tag", it.second.tag.toString())
.detail("Elapsed", now() - it.second.lastLogged)
.detail("MeanReplyBytes", it.second.replyBytes/it.second.totalPeeks)
.detail("TotalPeeks", it.second.totalPeeks)
.detail("UnblockedPeeks", it.second.unblockedPeeks)
.detail("DuplicatePeeks", it.second.duplicatePeeks)
.detail("Sequence", it.second.sequence_version.size() ? it.second.sequence_version.begin()->first : -1)
.detail("IdleSeconds", it.second.idleTime)
.detail("IdleMax", it.second.idleMax)
.detail("QueueSeconds", it.second.queueTime)
.detail("QueueMax", it.second.queueMax)
.detail("BlockSeconds", it.second.blockTime)
.detail("BlockMax", it.second.blockMax)
.detail("WorkSeconds", it.second.workTime)
.detail("WorkMax", it.second.workMax);
it.second.resetMetrics();
}
}
wait( delay(SERVER_KNOBS->PEEK_LOGGING_DELAY * std::max(1,logCount)) );
}
}
void getQueuingMetrics( TLogData* self, Reference<LogData> logData, TLogQueuingMetricsRequest const& req ) {
TLogQueuingMetricsReply reply;
reply.localTime = now();
@ -2278,6 +2386,7 @@ ACTOR Future<Void> tLogCore( TLogData* self, Reference<LogData> logData, TLogInt
logData->addActor.send( traceCounters("TLogMetrics", logData->logId, SERVER_KNOBS->STORAGE_LOGGING_DELAY, &logData->cc, logData->logId.toString() + "/TLogMetrics"));
logData->addActor.send( serveTLogInterface(self, tli, logData, warningCollectorInput) );
logData->addActor.send( cleanupPeekTrackers(logData.getPtr()) );
logData->addActor.send( logPeekTrackers(logData.getPtr()) );
if(!logData->isPrimary) {
std::vector<Tag> tags;

View File

@ -80,6 +80,7 @@ struct WorkerInterface {
logRouter.getEndpoint( TaskPriority::Worker );
debugPing.getEndpoint( TaskPriority::Worker );
coordinationPing.getEndpoint( TaskPriority::Worker );
eventLogRequest.getEndpoint( TaskPriority::Worker );
}
template <class Ar>