From 92baea660913e112a963fd3a2cca41cb9721924c Mon Sep 17 00:00:00 2001 From: Vishesh Yadav Date: Thu, 10 Oct 2024 03:48:28 +0000 Subject: [PATCH] Refactor --- fdbrpc/FlowTransport.actor.cpp | 35 ++++++++++++++++++++++++---------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/fdbrpc/FlowTransport.actor.cpp b/fdbrpc/FlowTransport.actor.cpp index cab1a22e97..d3a0976403 100644 --- a/fdbrpc/FlowTransport.actor.cpp +++ b/fdbrpc/FlowTransport.actor.cpp @@ -365,14 +365,20 @@ public: }; std::deque connectionHistory; Future connectionHistoryLoggerF; + Reference connectionLogWriterThread; }; struct ConnectionLogWriter : IThreadPoolReceiver { - const std::string base_dir = FLOW_KNOBS->CONNECTION_LOG_DIRECTORY; - std::string fileName = base_dir + "fdb-connection-log-" + std::to_string(now()) + ".csv"; + const std::string baseDir; + std::string fileName; std::fstream file; - virtual ~ConnectionLogWriter() { file.close(); } + ConnectionLogWriter(const std::string baseDir) : baseDir(baseDir) {} + + virtual ~ConnectionLogWriter() { + if (file.is_open()) + file.close(); + } struct AppendAction : TypedAction { std::string localAddr; @@ -383,7 +389,17 @@ struct ConnectionLogWriter : IThreadPoolReceiver { double getTimeEstimate() const { return 2; } }; - void init() {} + std::string newFileName() const { + return baseDir + "fdb-connection-log-" + time_str() + ".csv"; + } + + void init() { + fileName = newFileName(); + } + + std::string time_str() const { + return std::to_string(now()); + } void openOrRoll() { if (!file.is_open()) { @@ -398,7 +414,7 @@ struct ConnectionLogWriter : IThreadPoolReceiver { if (file.tellg() > 100 * 1024 * 1024 /* 100 MB */) { file.close(); - fileName = base_dir + "fdb-connection-log-" + std::to_string(now()) + ".csv"; + fileName = newFileName(); TraceEvent("RollConnectionLog").detail("FileName", fileName); openOrRoll(); } @@ -427,14 +443,13 @@ ACTOR Future connectionHistoryLogger(TransportData* self) { state Future next = Void(); // One thread ensures async serialized execution on the log file. - state Reference pool; if (g_network->isSimulated()) { - pool = Reference(new DummyThreadPool()); + self->connectionLogWriterThread = Reference(new DummyThreadPool()); } else { - pool = createGenericThreadPool(); + self->connectionLogWriterThread = createGenericThreadPool(); } - pool->addThread(new ConnectionLogWriter(), "fdb-connection-log-write"); + self->connectionLogWriterThread->addThread(new ConnectionLogWriter(FLOW_KNOBS->CONNECTION_LOG_DIRECTORY)); loop { wait(next); next = delay(FLOW_KNOBS->LOG_CONNECTION_INTERVAL_SECS); @@ -444,7 +459,7 @@ ACTOR Future connectionHistoryLogger(TransportData* self) { std::string localAddr = FlowTransport::getGlobalLocalAddress().toString(); auto action = new ConnectionLogWriter::AppendAction(localAddr, std::move(self->connectionHistory)); ASSERT(action != nullptr); - pool->post(action); + self->connectionLogWriterThread->post(action); ASSERT(self->connectionHistory.size() == 0); } }