Refactor
This commit is contained in:
parent
b7fbe20f29
commit
92baea6609
|
@ -365,14 +365,20 @@ public:
|
|||
};
|
||||
std::deque<ConnectionHistoryEntry> connectionHistory;
|
||||
Future<Void> connectionHistoryLoggerF;
|
||||
Reference<IThreadPool> connectionLogWriterThread;
|
||||
};
|
||||
|
||||
struct ConnectionLogWriter : IThreadPoolReceiver {
|
||||
const std::string base_dir = FLOW_KNOBS->CONNECTION_LOG_DIRECTORY;
|
||||
std::string fileName = base_dir + "fdb-connection-log-" + std::to_string(now()) + ".csv";
|
||||
const std::string baseDir;
|
||||
std::string fileName;
|
||||
std::fstream file;
|
||||
|
||||
virtual ~ConnectionLogWriter() { file.close(); }
|
||||
ConnectionLogWriter(const std::string baseDir) : baseDir(baseDir) {}
|
||||
|
||||
virtual ~ConnectionLogWriter() {
|
||||
if (file.is_open())
|
||||
file.close();
|
||||
}
|
||||
|
||||
struct AppendAction : TypedAction<ConnectionLogWriter, AppendAction> {
|
||||
std::string localAddr;
|
||||
|
@ -383,7 +389,17 @@ struct ConnectionLogWriter : IThreadPoolReceiver {
|
|||
double getTimeEstimate() const { return 2; }
|
||||
};
|
||||
|
||||
void init() {}
|
||||
std::string newFileName() const {
|
||||
return baseDir + "fdb-connection-log-" + time_str() + ".csv";
|
||||
}
|
||||
|
||||
void init() {
|
||||
fileName = newFileName();
|
||||
}
|
||||
|
||||
std::string time_str() const {
|
||||
return std::to_string(now());
|
||||
}
|
||||
|
||||
void openOrRoll() {
|
||||
if (!file.is_open()) {
|
||||
|
@ -398,7 +414,7 @@ struct ConnectionLogWriter : IThreadPoolReceiver {
|
|||
|
||||
if (file.tellg() > 100 * 1024 * 1024 /* 100 MB */) {
|
||||
file.close();
|
||||
fileName = base_dir + "fdb-connection-log-" + std::to_string(now()) + ".csv";
|
||||
fileName = newFileName();
|
||||
TraceEvent("RollConnectionLog").detail("FileName", fileName);
|
||||
openOrRoll();
|
||||
}
|
||||
|
@ -427,14 +443,13 @@ ACTOR Future<Void> connectionHistoryLogger(TransportData* self) {
|
|||
state Future<Void> next = Void();
|
||||
|
||||
// One thread ensures async serialized execution on the log file.
|
||||
state Reference<IThreadPool> pool;
|
||||
if (g_network->isSimulated()) {
|
||||
pool = Reference<IThreadPool>(new DummyThreadPool());
|
||||
self->connectionLogWriterThread = Reference<IThreadPool>(new DummyThreadPool());
|
||||
} else {
|
||||
pool = createGenericThreadPool();
|
||||
self->connectionLogWriterThread = createGenericThreadPool();
|
||||
}
|
||||
pool->addThread(new ConnectionLogWriter(), "fdb-connection-log-write");
|
||||
|
||||
self->connectionLogWriterThread->addThread(new ConnectionLogWriter(FLOW_KNOBS->CONNECTION_LOG_DIRECTORY));
|
||||
loop {
|
||||
wait(next);
|
||||
next = delay(FLOW_KNOBS->LOG_CONNECTION_INTERVAL_SECS);
|
||||
|
@ -444,7 +459,7 @@ ACTOR Future<Void> connectionHistoryLogger(TransportData* self) {
|
|||
std::string localAddr = FlowTransport::getGlobalLocalAddress().toString();
|
||||
auto action = new ConnectionLogWriter::AppendAction(localAddr, std::move(self->connectionHistory));
|
||||
ASSERT(action != nullptr);
|
||||
pool->post(action);
|
||||
self->connectionLogWriterThread->post(action);
|
||||
ASSERT(self->connectionHistory.size() == 0);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue