Merge pull request #429 from ajbeamon/trace-log-refactor

Trace log refactor
This commit is contained in:
Steve Atherton 2018-06-27 14:52:09 -07:00 committed by GitHub
commit cbcf5177eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1088 additions and 795 deletions

View File

@ -68,13 +68,13 @@ ACTOR Future<WorkerInterface> getMasterWorker( Database cx, Reference<AsyncVar<S
ACTOR Future<int64_t> getDataInFlight( Database cx, WorkerInterface masterWorker ) {
try {
TraceEvent("DataInFlight").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingMaster");
Standalone<StringRef> md = wait( timeoutError(masterWorker.eventLogRequest.getReply(
TraceEventFields md = wait( timeoutError(masterWorker.eventLogRequest.getReply(
EventLogRequest( StringRef( cx->dbName.toString() + "/TotalDataInFlight" ) ) ), 1.0 ) );
int64_t dataInFlight;
sscanf(extractAttribute(md.toString(), "TotalBytes").c_str(), "%lld", &dataInFlight);
sscanf(md.getValue("TotalBytes").c_str(), "%lld", &dataInFlight);
return dataInFlight;
} catch( Error &e ) {
TraceEvent("QuietDatabaseFailure", masterWorker.id()).detail("Reason", "Failed to extract DataInFlight");
TraceEvent("QuietDatabaseFailure", masterWorker.id()).detail("Reason", "Failed to extract DataInFlight").error(e);
throw;
}
@ -89,13 +89,13 @@ ACTOR Future<int64_t> getDataInFlight( Database cx, Reference<AsyncVar<ServerDBI
}
//Computes the queue size for storage servers and tlogs using the bytesInput and bytesDurable attributes
int64_t getQueueSize( Standalone<StringRef> md ) {
int64_t getQueueSize( TraceEventFields md ) {
double inputRate, durableRate;
double inputRoughness, durableRoughness;
int64_t inputBytes, durableBytes;
sscanf(extractAttribute(md.toString(), "BytesInput").c_str(), "%lf %lf %lld", &inputRate, &inputRoughness, &inputBytes);
sscanf(extractAttribute(md.toString(), "BytesDurable").c_str(), "%lf %lf %lld", &durableRate, &durableRoughness, &durableBytes);
sscanf(md.getValue("BytesInput").c_str(), "%lf %lf %lld", &inputRate, &inputRoughness, &inputBytes);
sscanf(md.getValue("BytesDurable").c_str(), "%lf %lf %lld", &durableRate, &durableRoughness, &durableBytes);
return inputBytes - durableBytes;
}
@ -110,7 +110,7 @@ ACTOR Future<int64_t> getMaxTLogQueueSize( Database cx, Reference<AsyncVar<Serve
workersMap[worker.first.address()] = worker.first;
}
state std::vector<Future<Standalone<StringRef>>> messages;
state std::vector<Future<TraceEventFields>> messages;
state std::vector<TLogInterface> tlogs = dbInfo->get().logSystemConfig.allPresentLogs();
for(int i = 0; i < tlogs.size(); i++) {
auto itr = workersMap.find(tlogs[i].address());
@ -182,7 +182,7 @@ ACTOR Future<int64_t> getMaxStorageServerQueueSize( Database cx, Reference<Async
workersMap[worker.first.address()] = worker.first;
}
state std::vector<Future<Standalone<StringRef>>> messages;
state std::vector<Future<TraceEventFields>> messages;
for(int i = 0; i < servers.size(); i++) {
auto itr = workersMap.find(servers[i].address());
if(itr == workersMap.end()) {
@ -224,17 +224,17 @@ ACTOR Future<int64_t> getDataDistributionQueueSize( Database cx, WorkerInterface
try {
TraceEvent("DataDistributionQueueSize").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingMaster");
Standalone<StringRef> movingDataMessage = wait( timeoutError(masterWorker.eventLogRequest.getReply(
TraceEventFields movingDataMessage = wait( timeoutError(masterWorker.eventLogRequest.getReply(
EventLogRequest( StringRef( cx->dbName.toString() + "/MovingData") ) ), 1.0 ) );
TraceEvent("DataDistributionQueueSize").detail("Database", printable(cx->dbName)).detail("Stage", "GotString").detail("Result", printable(movingDataMessage)).detail("TrackLatest", printable( StringRef( cx->dbName.toString() + "/MovingData") ) );
TraceEvent("DataDistributionQueueSize").detail("Database", printable(cx->dbName)).detail("Stage", "GotString")/*.detail("Result", printable(movingDataMessage))*/.detail("TrackLatest", printable( StringRef( cx->dbName.toString() + "/MovingData") ) );
int64_t inQueue;
sscanf(extractAttribute(movingDataMessage.toString(), "InQueue").c_str(), "%lld", &inQueue);
sscanf(movingDataMessage.getValue("InQueue").c_str(), "%lld", &inQueue);
if(reportInFlight) {
int64_t inFlight;
sscanf(extractAttribute(movingDataMessage.toString(), "InFlight").c_str(), "%lld", &inFlight);
sscanf(movingDataMessage.getValue("InFlight").c_str(), "%lld", &inFlight);
inQueue += inFlight;
}
@ -258,10 +258,10 @@ ACTOR Future<bool> getDataDistributionActive( Database cx, WorkerInterface maste
try {
TraceEvent("DataDistributionActive").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingMaster");
Standalone<StringRef> activeMessage = wait( timeoutError(masterWorker.eventLogRequest.getReply(
TraceEventFields activeMessage = wait( timeoutError(masterWorker.eventLogRequest.getReply(
EventLogRequest( StringRef( cx->dbName.toString() + "/DDTrackerStarting") ) ), 1.0 ) );
return extractAttribute(activeMessage.toString(), "State") == "Active";
return activeMessage.getValue("State") == "Active";
} catch( Error &e ) {
TraceEvent("QuietDatabaseFailure", masterWorker.id()).detail("Reason", "Failed to extract DataDistributionActive");
throw;
@ -273,10 +273,10 @@ ACTOR Future<bool> getStorageServersRecruiting( Database cx, Reference<AsyncVar<
try {
TraceEvent("StorageServersRecruiting").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingMaster");
Standalone<StringRef> recruitingMessage = wait( timeoutError(masterWorker.eventLogRequest.getReply(
TraceEventFields recruitingMessage = wait( timeoutError(masterWorker.eventLogRequest.getReply(
EventLogRequest( StringRef( cx->dbName.toString() + "/StorageServerRecruitment_" + dbInfo->get().master.id().toString()) ) ), 1.0 ) );
return extractAttribute(recruitingMessage.toString(), "State") == "Recruiting";
return recruitingMessage.getValue("State") == "Recruiting";
} catch( Error &e ) {
TraceEvent("QuietDatabaseFailure", masterWorker.id()).detail("Reason", "Failed to extract StorageServersRecruiting").detail("MasterID", dbInfo->get().master.id());
throw;

View File

@ -75,137 +75,28 @@ extern int limitReasonEnd;
extern const char* limitReasonName[];
extern const char* limitReasonDesc[];
// Returns -1 if it fails to find a quoted string at the start of xml; returns the position beyond the close quote
// If decoded is not NULL, writes the decoded attribute value there
int decodeQuotedAttributeValue( StringRef xml, std::string* decoded ) {
if (decoded) decoded->clear();
if (!xml.size() || xml[0] != '"') return -1;
int pos = 1;
struct WorkerEvents : std::map<NetworkAddress, TraceEventFields> {};
loop {
if (pos == xml.size()) return -1; // No closing quote
if (xml[pos]=='"') { pos++; break; } // Success
uint8_t out = xml[pos];
if (xml[pos] == '&') {
if (xml.substr(pos).startsWith(LiteralStringRef("&amp;"))) { out = '&'; pos += 5; }
else if (xml.substr(pos).startsWith(LiteralStringRef("&lt;"))) { out = '<'; pos += 4; }
else if (xml.substr(pos).startsWith(LiteralStringRef("&quot;"))) { out = '"'; pos += 6; }
else return -1;
} else
pos++;
if (decoded) decoded->push_back(out);
}
return pos;
}
// return false on failure; outputs decoded attribute value to `ret`
bool tryExtractAttribute( StringRef expanded, StringRef attributeToExtract, std::string& ret ) {
// This is only expected to parse the XML that Trace.cpp actually generates; we haven't looked at the standard to even find out what it doesn't try to do
int pos = 0;
// Consume '<'
if (pos == expanded.size() || expanded[pos] != '<') return false;
pos++;
// Consume tag name
while (pos != expanded.size() && expanded[pos] != ' ' && expanded[pos] != '/' && expanded[pos] != '>') pos++;
while (pos != expanded.size() && expanded[pos] != '>' && expanded[pos] != '/') {
// Consume whitespace
while (pos != expanded.size() && expanded[pos] == ' ') pos++;
// We should be looking at an attribute or the end of the string; find '=' at the end of the attribute, if any
int eq_or_end = pos;
while (eq_or_end != expanded.size() && expanded[eq_or_end]!='=' && expanded[eq_or_end]!='>') eq_or_end++;
if ( expanded.substr(pos, eq_or_end-pos) == attributeToExtract ) {
// Found the attribute we want; decode the value
int end = decodeQuotedAttributeValue(expanded.substr(eq_or_end+1), &ret);
if (end<0) { ret.clear(); return false; }
return true;
}
// We don't want this attribute, but we need to skip over its value
// It looks like this *could* just be a scan for '"' characters
int end = decodeQuotedAttributeValue(expanded.substr(eq_or_end+1), NULL);
if (end<0) return false;
pos = (eq_or_end+1)+end;
}
return false;
}
// Throws attribute_not_found if the key is not found
std::string extractAttribute( StringRef expanded, StringRef attributeToExtract ) {
std::string ret;
if (!tryExtractAttribute(expanded, attributeToExtract, ret))
throw attribute_not_found();
return ret;
}
std::string extractAttribute( std::string const& expanded, std::string const& attributeToExtract ) {
return extractAttribute(StringRef(expanded), StringRef(attributeToExtract));
}
TEST_CASE("fdbserver/Status/extractAttribute/basic") {
std::string a;
ASSERT( tryExtractAttribute(
LiteralStringRef("<Foo A=\"&quot;a&quot;\" B=\"\" />"),
LiteralStringRef("A"),
a) && a == LiteralStringRef("\"a\""));
ASSERT( tryExtractAttribute(
LiteralStringRef("<Foo A=\"&quot;a&quot;\" B=\"\\\" />"),
LiteralStringRef("B"),
a) && a == LiteralStringRef("\\") );
ASSERT( tryExtractAttribute(
LiteralStringRef("<Event Severity=\"10\" Time=\"1415124565.129695\" Type=\"ProgramStart\" Machine=\"10.0.0.85:6863\" ID=\"0000000000000000\" RandomSeed=\"-2044671207\" SourceVersion=\"675cd9579467+ tip\" Version=\"3.0.0-PRERELEASE\" PackageName=\"3.0\" DataFolder=\"\" ConnectionString=\"circus:81060aa85f0a5b5b@10.0.0.5:4000,10.0.0.17:4000,10.0.0.78:4000,10.0.0.162:4000,10.0.0.182:4000\" ActualTime=\"1415124565\" CommandLine=\"fdbserver -r multitest -p auto:6863 -f /tmp/circus/testspec.txt --num_testers 24 --logdir /tmp/circus/multitest\" BuggifyEnabled=\"0\"/>"),
LiteralStringRef("Version"),
a) && a == LiteralStringRef("3.0.0-PRERELEASE") );
ASSERT( !tryExtractAttribute(
LiteralStringRef("<Event Severity=\"10\" Time=\"1415124565.129695\" Type=\"ProgramStart\" Machine=\"10.0.0.85:6863\" ID=\"0000000000000000\" RandomSeed=\"-2044671207\" SourceVersion=\"675cd9579467+ tip\" Version=\"3.0.0-PRERELEASE\" PackageName=\"3.0\" DataFolder=\"\" ConnectionString=\"circus:81060aa85f0a5b5b@10.0.0.5:4000,10.0.0.17:4000,10.0.0.78:4000,10.0.0.162:4000,10.0.0.182:4000\" ActualTime=\"1415124565\" CommandLine=\"fdbserver -r multitest -p auto:6863 -f /tmp/circus/testspec.txt --num_testers 24 --logdir /tmp/circus/multitest\" BuggifyEnabled=\"0\"/>"),
LiteralStringRef("ersion"),
a) );
return Void();
}
TEST_CASE("fdbserver/Status/extractAttribute/fuzz") {
// This is just looking for anything that crashes or infinite loops
std::string out;
for(int i=0; i<100000; i++)
{
std::string s = "<Event Severity=\"10\" Time=\"1415124565.129695\" Type=\"Program &quot;Start&quot;\" Machine=\"10.0.0.85:6863\" ID=\"0000000000000000\" RandomSeed=\"-2044671207\" SourceVersion=\"675cd9579467+ tip\" Version=\"3.0.0-PRERELEASE\" PackageName=\"3.0\" DataFolder=\"\" ConnectionString=\"circus:81060aa85f0a5b5b@10.0.0.5:4000,10.0.0.17:4000,10.0.0.78:4000,10.0.0.162:4000,10.0.0.182:4000\" ActualTime=\"1415124565\" CommandLine=\"fdbserver -r multitest -p auto:6863 -f /tmp/circus/testspec.txt --num_testers 24 --logdir /tmp/circus/multitest\" BuggifyEnabled=\"0\"/>";
s[ g_random->randomInt(0, s.size()) ] = g_random->randomChoice(LiteralStringRef("\" =q0\\&"));
tryExtractAttribute(s, LiteralStringRef("Version"), out);
}
return Void();
}
struct WorkerEvents : std::map<NetworkAddress, std::string> {};
ACTOR static Future< Optional<std::string> > latestEventOnWorker(WorkerInterface worker, std::string eventName) {
ACTOR static Future< Optional<TraceEventFields> > latestEventOnWorker(WorkerInterface worker, std::string eventName) {
try {
EventLogRequest req = eventName.size() > 0 ? EventLogRequest(Standalone<StringRef>(eventName)) : EventLogRequest();
ErrorOr<Standalone<StringRef>> eventTrace = wait( errorOr(timeoutError(worker.eventLogRequest.getReply(req), 2.0)));
ErrorOr<TraceEventFields> eventTrace = wait( errorOr(timeoutError(worker.eventLogRequest.getReply(req), 2.0)));
if (eventTrace.isError()){
return Optional<std::string>();
return Optional<TraceEventFields>();
}
return eventTrace.get().toString();
return eventTrace.get();
}
catch (Error &e){
if (e.code() == error_code_actor_cancelled)
throw;
return Optional<std::string>();
return Optional<TraceEventFields>();
}
}
ACTOR static Future< Optional< std::pair<WorkerEvents, std::set<std::string>> > > latestEventOnWorkers(std::vector<std::pair<WorkerInterface, ProcessClass>> workers, std::string eventName) {
try {
state vector<Future<ErrorOr<Standalone<StringRef>>>> eventTraces;
state vector<Future<ErrorOr<TraceEventFields>>> eventTraces;
for (int c = 0; c < workers.size(); c++) {
EventLogRequest req = eventName.size() > 0 ? EventLogRequest(Standalone<StringRef>(eventName)) : EventLogRequest();
eventTraces.push_back(errorOr(timeoutError(workers[c].first.eventLogRequest.getReply(req), 2.0)));
@ -217,13 +108,13 @@ ACTOR static Future< Optional< std::pair<WorkerEvents, std::set<std::string>> >
WorkerEvents results;
for (int i = 0; i < eventTraces.size(); i++) {
ErrorOr<Standalone<StringRef>> v = eventTraces[i].get();
const ErrorOr<TraceEventFields>& v = eventTraces[i].get();
if (v.isError()){
failed.insert(workers[i].first.address().toString());
results[workers[i].first.address()] = "";
results[workers[i].first.address()] = TraceEventFields();
}
else {
results[workers[i].first.address()] = v.get().toString();
results[workers[i].first.address()] = v.get();
}
}
@ -342,21 +233,21 @@ static StatusObject getLocalityInfo(const LocalityData& locality) {
return localityObj;
}
static StatusObject getError(std::string error) {
static StatusObject getError(const TraceEventFields& errorFields) {
StatusObject statusObj;
try {
if (error.size()) {
double time = atof(extractAttribute(error, "Time").c_str());
if (errorFields.size()) {
double time = atof(errorFields.getValue("Time").c_str());
statusObj["time"] = time;
statusObj["raw_log_message"] = error;
statusObj["raw_log_message"] = errorFields.toString();
std::string type = extractAttribute(error, "Type");
std::string type = errorFields.getValue("Type");
statusObj["type"] = type;
std::string description = type;
std::string errorName;
if (tryExtractAttribute(error, LiteralStringRef("Error"), errorName)) {
if(errorFields.tryGetValue("Error", errorName)) {
statusObj["name"] = errorName;
description += ": " + errorName;
}
@ -374,7 +265,7 @@ static StatusObject getError(std::string error) {
}
}
catch (Error &e){
TraceEvent(SevError, "StatusGetErrorError").error(e).detail("RawError", error);
TraceEvent(SevError, "StatusGetErrorError").error(e).detail("RawError", errorFields.toString());
}
return statusObj;
}
@ -385,7 +276,7 @@ static StatusObject machineStatusFetcher(WorkerEvents mMetrics, vector<std::pair
int failed = 0;
// map from machine networkAddress to datacenter ID
WorkerEvents dcIds;
std::map<NetworkAddress, std::string> dcIds;
std::map<NetworkAddress, LocalityData> locality;
for (auto worker : workers){
@ -401,12 +292,12 @@ static StatusObject machineStatusFetcher(WorkerEvents mMetrics, vector<std::pair
}
StatusObject statusObj; // Represents the status for a machine
std::string event = it->second;
const TraceEventFields& event = it->second;
try {
std::string address = toIPString(it->first.ip);
// We will use the "physical" caluculated machine ID here to limit exposure to machineID repurposing
std::string machineId = extractAttribute(event, "MachineID");
std::string machineId = event.getValue("MachineID");
// If this machine ID does not already exist in the machineMap, add it
if (!machineMap.count(machineId)) {
@ -424,23 +315,23 @@ static StatusObject machineStatusFetcher(WorkerEvents mMetrics, vector<std::pair
StatusObject memoryObj;
metric = parseDouble(extractAttribute(event, "TotalMemory"));
metric = parseDouble(event.getValue("TotalMemory"));
memoryObj["total_bytes"] = metric;
metric = parseDouble(extractAttribute(event, "CommittedMemory"));
metric = parseDouble(event.getValue("CommittedMemory"));
memoryObj["committed_bytes"] = metric;
metric = parseDouble(extractAttribute(event, "AvailableMemory"));
metric = parseDouble(event.getValue("AvailableMemory"));
memoryObj["free_bytes"] = metric;
statusObj["memory"] = memoryObj;
StatusObject cpuObj;
metric = parseDouble(extractAttribute(event, "CPUSeconds"));
metric = parseDouble(event.getValue("CPUSeconds"));
double cpu_seconds = metric;
metric = parseDouble(extractAttribute(event, "Elapsed"));
metric = parseDouble(event.getValue("Elapsed"));
double elapsed = metric;
if (elapsed > 0){
@ -451,17 +342,17 @@ static StatusObject machineStatusFetcher(WorkerEvents mMetrics, vector<std::pair
StatusObject networkObj;
metric = parseDouble(extractAttribute(event, "MbpsSent"));
metric = parseDouble(event.getValue("MbpsSent"));
StatusObject megabits_sent;
megabits_sent["hz"] = metric;
networkObj["megabits_sent"] = megabits_sent;
metric = parseDouble(extractAttribute(event, "MbpsReceived"));
metric = parseDouble(event.getValue("MbpsReceived"));
StatusObject megabits_received;
megabits_received["hz"] = metric;
networkObj["megabits_received"] = megabits_received;
metric = parseDouble(extractAttribute(event, "RetransSegs"));
metric = parseDouble(event.getValue("RetransSegs"));
StatusObject retransSegsObj;
if (elapsed > 0){
retransSegsObj["hz"] = metric / elapsed;
@ -512,25 +403,25 @@ struct RolesInfo {
obj["role"] = role;
return roles.insert( make_pair(address, obj ))->second;
}
StatusObject& addRole(std::string const& role, StorageServerInterface& iface, std::string const& metrics, Version maxTLogVersion) {
StatusObject& addRole(std::string const& role, StorageServerInterface& iface, TraceEventFields const& metrics, Version maxTLogVersion) {
StatusObject obj;
obj["id"] = iface.id().shortString();
obj["role"] = role;
try {
obj["stored_bytes"] = parseInt64(extractAttribute(metrics, "BytesStored"));
obj["kvstore_used_bytes"] = parseInt64(extractAttribute(metrics, "KvstoreBytesUsed"));
obj["kvstore_free_bytes"] = parseInt64(extractAttribute(metrics, "KvstoreBytesFree"));
obj["kvstore_available_bytes"] = parseInt64(extractAttribute(metrics, "KvstoreBytesAvailable"));
obj["kvstore_total_bytes"] = parseInt64(extractAttribute(metrics, "KvstoreBytesTotal"));
obj["input_bytes"] = parseCounter(extractAttribute(metrics, "BytesInput"));
obj["durable_bytes"] = parseCounter(extractAttribute(metrics, "BytesDurable"));
obj["query_queue_max"] = parseInt(extractAttribute(metrics, "QueryQueueMax"));
obj["finished_queries"] = parseCounter(extractAttribute(metrics, "FinishedQueries"));
obj["stored_bytes"] = parseInt64(metrics.getValue("BytesStored"));
obj["kvstore_used_bytes"] = parseInt64(metrics.getValue("KvstoreBytesUsed"));
obj["kvstore_free_bytes"] = parseInt64(metrics.getValue("KvstoreBytesFree"));
obj["kvstore_available_bytes"] = parseInt64(metrics.getValue("KvstoreBytesAvailable"));
obj["kvstore_total_bytes"] = parseInt64(metrics.getValue("KvstoreBytesTotal"));
obj["input_bytes"] = parseCounter(metrics.getValue("BytesInput"));
obj["durable_bytes"] = parseCounter(metrics.getValue("BytesDurable"));
obj["query_queue_max"] = parseInt(metrics.getValue("QueryQueueMax"));
obj["finished_queries"] = parseCounter(metrics.getValue("FinishedQueries"));
Version version = parseInt64(extractAttribute(metrics, "Version"));
Version version = parseInt64(metrics.getValue("Version"));
obj["data_version"] = version;
int64_t versionLag = parseInt64(extractAttribute(metrics, "VersionLag"));
int64_t versionLag = parseInt64(metrics.getValue("VersionLag"));
if(maxTLogVersion > 0) {
// It's possible that the storage server hasn't talked to the logs recently, in which case it may not be aware of how far behind it is.
// To account for that, we also compute the version difference between each storage server and the tlog with the largest version.
@ -552,22 +443,22 @@ struct RolesInfo {
}
return roles.insert( make_pair(iface.address(), obj ))->second;
}
StatusObject& addRole(std::string const& role, TLogInterface& iface, std::string const& metrics) {
StatusObject& addRole(std::string const& role, TLogInterface& iface, TraceEventFields const& metrics) {
StatusObject obj;
obj["id"] = iface.id().shortString();
obj["role"] = role;
try {
obj["kvstore_used_bytes"] = parseInt64(extractAttribute(metrics, "KvstoreBytesUsed"));
obj["kvstore_free_bytes"] = parseInt64(extractAttribute(metrics, "KvstoreBytesFree"));
obj["kvstore_available_bytes"] = parseInt64(extractAttribute(metrics, "KvstoreBytesAvailable"));
obj["kvstore_total_bytes"] = parseInt64(extractAttribute(metrics, "KvstoreBytesTotal"));
obj["queue_disk_used_bytes"] = parseInt64(extractAttribute(metrics, "QueueDiskBytesUsed"));
obj["queue_disk_free_bytes"] = parseInt64(extractAttribute(metrics, "QueueDiskBytesFree"));
obj["queue_disk_available_bytes"] = parseInt64(extractAttribute(metrics, "QueueDiskBytesAvailable"));
obj["queue_disk_total_bytes"] = parseInt64(extractAttribute(metrics, "QueueDiskBytesTotal"));
obj["input_bytes"] = parseCounter(extractAttribute(metrics, "BytesInput"));
obj["durable_bytes"] = parseCounter(extractAttribute(metrics, "BytesDurable"));
obj["data_version"] = parseInt64(extractAttribute(metrics, "Version"));
obj["kvstore_used_bytes"] = parseInt64(metrics.getValue("KvstoreBytesUsed"));
obj["kvstore_free_bytes"] = parseInt64(metrics.getValue("KvstoreBytesFree"));
obj["kvstore_available_bytes"] = parseInt64(metrics.getValue("KvstoreBytesAvailable"));
obj["kvstore_total_bytes"] = parseInt64(metrics.getValue("KvstoreBytesTotal"));
obj["queue_disk_used_bytes"] = parseInt64(metrics.getValue("QueueDiskBytesUsed"));
obj["queue_disk_free_bytes"] = parseInt64(metrics.getValue("QueueDiskBytesFree"));
obj["queue_disk_available_bytes"] = parseInt64(metrics.getValue("QueueDiskBytesAvailable"));
obj["queue_disk_total_bytes"] = parseInt64(metrics.getValue("QueueDiskBytesTotal"));
obj["input_bytes"] = parseCounter(metrics.getValue("BytesInput"));
obj["durable_bytes"] = parseCounter(metrics.getValue("BytesDurable"));
obj["data_version"] = parseInt64(metrics.getValue("Version"));
} catch (Error& e) {
if(e.code() != error_code_attribute_not_found)
throw e;
@ -598,8 +489,8 @@ ACTOR static Future<StatusObject> processStatusFetcher(
WorkerEvents traceFileOpenErrors,
WorkerEvents programStarts,
std::map<std::string, StatusObject> processIssues,
vector<std::pair<StorageServerInterface, std::string>> storageServers,
vector<std::pair<TLogInterface, std::string>> tLogs,
vector<std::pair<StorageServerInterface, TraceEventFields>> storageServers,
vector<std::pair<TLogInterface, TraceEventFields>> tLogs,
Database cx,
Optional<DatabaseConfiguration> configuration,
std::set<std::string> *incomplete_reasons) {
@ -616,10 +507,10 @@ ACTOR static Future<StatusObject> processStatusFetcher(
Void _ = wait(yield());
if (traceFileErrorsItr->second.size()){
try {
// Have event string, parse it and turn it into a message object describing the trace file opening error
std::string event = traceFileErrorsItr->second;
std::string fileName = extractAttribute(event, "Filename");
StatusObject msgObj = makeMessage("file_open_error", format("Could not open file '%s' (%s).", fileName.c_str(), extractAttribute(event, "Error").c_str()).c_str());
// Have event fields, parse it and turn it into a message object describing the trace file opening error
const TraceEventFields& event = traceFileErrorsItr->second;
std::string fileName = event.getValue("Filename");
StatusObject msgObj = makeMessage("file_open_error", format("Could not open file '%s' (%s).", fileName.c_str(), event.getValue("Error").c_str()).c_str());
msgObj["file_name"] = fileName;
// Map the address of the worker to the error message object
@ -638,11 +529,11 @@ ACTOR static Future<StatusObject> processStatusFetcher(
state std::map<Optional<Standalone<StringRef>>, MachineMemoryInfo>::iterator memInfo = machineMemoryUsage.insert(std::make_pair(workerItr->first.locality.machineId(), MachineMemoryInfo())).first;
try {
ASSERT(pMetrics.count(workerItr->first.address()));
std::string processMetrics = pMetrics[workerItr->first.address()];
const TraceEventFields& processMetrics = pMetrics[workerItr->first.address()];
if(memInfo->second.valid()) {
if(processMetrics.size() > 0) {
memInfo->second.memoryUsage += parseDouble(extractAttribute(processMetrics, "Memory"));
memInfo->second.memoryUsage += parseDouble(processMetrics.getValue("Memory"));
++memInfo->second.numProcesses;
}
else
@ -668,7 +559,7 @@ ACTOR static Future<StatusObject> processStatusFetcher(
}
}
state std::vector<std::pair<TLogInterface, std::string>>::iterator log;
state std::vector<std::pair<TLogInterface, TraceEventFields>>::iterator log;
state Version maxTLogVersion = 0;
for(log = tLogs.begin(); log != tLogs.end(); ++log) {
StatusObject const& roleStatus = roles.addRole( "log", log->first, log->second );
@ -678,7 +569,7 @@ ACTOR static Future<StatusObject> processStatusFetcher(
Void _ = wait(yield());
}
state std::vector<std::pair<StorageServerInterface, std::string>>::iterator ss;
state std::vector<std::pair<StorageServerInterface, TraceEventFields>>::iterator ss;
state std::map<NetworkAddress, int64_t> ssLag;
for(ss = storageServers.begin(); ss != storageServers.end(); ++ss) {
StatusObject const& roleStatus = roles.addRole( "storage", ss->first, ss->second, maxTLogVersion );
@ -704,45 +595,45 @@ ACTOR static Future<StatusObject> processStatusFetcher(
processMap[printable(workerItr->first.locality.processId())] = StatusObject();
NetworkAddress address = workerItr->first.address();
std::string event = pMetrics[workerItr->first.address()];
const TraceEventFields& event = pMetrics[workerItr->first.address()];
statusObj["address"] = address.toString();
StatusObject memoryObj;
if (event.size() > 0) {
std::string zoneID = extractAttribute(event, "ZoneID");
std::string zoneID = event.getValue("ZoneID");
statusObj["fault_domain"] = zoneID;
std::string MachineID = extractAttribute(event, "MachineID");
std::string MachineID = event.getValue("MachineID");
statusObj["machine_id"] = MachineID;
statusObj["locality"] = getLocalityInfo(workerItr->first.locality);
statusObj["uptime_seconds"] = parseDouble(extractAttribute(event, "UptimeSeconds"));
statusObj["uptime_seconds"] = parseDouble(event.getValue("UptimeSeconds"));
metric = parseDouble(extractAttribute(event, "CPUSeconds"));
metric = parseDouble(event.getValue("CPUSeconds"));
double cpu_seconds = metric;
// rates are calculated over the last elapsed seconds
metric = parseDouble(extractAttribute(event, "Elapsed"));
metric = parseDouble(event.getValue("Elapsed"));
double elapsed = metric;
metric = parseDouble(extractAttribute(event, "DiskIdleSeconds"));
metric = parseDouble(event.getValue("DiskIdleSeconds"));
double diskIdleSeconds = metric;
metric = parseDouble(extractAttribute(event, "DiskReads"));
metric = parseDouble(event.getValue("DiskReads"));
double diskReads = metric;
metric = parseDouble(extractAttribute(event, "DiskWrites"));
metric = parseDouble(event.getValue("DiskWrites"));
double diskWrites = metric;
uint64_t diskReadsCount = parseInt64(extractAttribute(event, "DiskReadsCount"));
uint64_t diskReadsCount = parseInt64(event.getValue("DiskReadsCount"));
uint64_t diskWritesCount = parseInt64(extractAttribute(event, "DiskWritesCount"));
uint64_t diskWritesCount = parseInt64(event.getValue("DiskWritesCount"));
metric = parseDouble(extractAttribute(event, "DiskWriteSectors"));
metric = parseDouble(event.getValue("DiskWriteSectors"));
double diskWriteSectors = metric;
metric = parseDouble(extractAttribute(event, "DiskReadSectors"));
metric = parseDouble(event.getValue("DiskReadSectors"));
double diskReadSectors = metric;
StatusObject diskObj;
@ -769,39 +660,39 @@ ACTOR static Future<StatusObject> processStatusFetcher(
diskObj["writes"] = writesObj;
}
diskObj["total_bytes"] = parseInt64(extractAttribute(event, "DiskTotalBytes"));
diskObj["free_bytes"] = parseInt64(extractAttribute(event, "DiskFreeBytes"));
diskObj["total_bytes"] = parseInt64(event.getValue("DiskTotalBytes"));
diskObj["free_bytes"] = parseInt64(event.getValue("DiskFreeBytes"));
statusObj["disk"] = diskObj;
StatusObject networkObj;
networkObj["current_connections"] = parseInt64(extractAttribute(event, "CurrentConnections"));
networkObj["current_connections"] = parseInt64(event.getValue("CurrentConnections"));
StatusObject connections_established;
connections_established["hz"] = parseDouble(extractAttribute(event, "ConnectionsEstablished"));
connections_established["hz"] = parseDouble(event.getValue("ConnectionsEstablished"));
networkObj["connections_established"] = connections_established;
StatusObject connections_closed;
connections_closed["hz"] = parseDouble(extractAttribute(event, "ConnectionsClosed"));
connections_closed["hz"] = parseDouble(event.getValue("ConnectionsClosed"));
networkObj["connections_closed"] = connections_closed;
StatusObject connection_errors;
connection_errors["hz"] = parseDouble(extractAttribute(event, "ConnectionErrors"));
connection_errors["hz"] = parseDouble(event.getValue("ConnectionErrors"));
networkObj["connection_errors"] = connection_errors;
metric = parseDouble(extractAttribute(event, "MbpsSent"));
metric = parseDouble(event.getValue("MbpsSent"));
StatusObject megabits_sent;
megabits_sent["hz"] = metric;
networkObj["megabits_sent"] = megabits_sent;
metric = parseDouble(extractAttribute(event, "MbpsReceived"));
metric = parseDouble(event.getValue("MbpsReceived"));
StatusObject megabits_received;
megabits_received["hz"] = metric;
networkObj["megabits_received"] = megabits_received;
statusObj["network"] = networkObj;
metric = parseDouble(extractAttribute(event, "Memory"));
metric = parseDouble(event.getValue("Memory"));
memoryObj["used_bytes"] = metric;
metric = parseDouble(extractAttribute(event, "UnusedAllocatedMemory"));
metric = parseDouble(event.getValue("UnusedAllocatedMemory"));
memoryObj["unused_allocated_memory"] = metric;
}
@ -809,16 +700,16 @@ ACTOR static Future<StatusObject> processStatusFetcher(
auto const& psxml = programStarts.at(address);
if(psxml.size() > 0) {
int64_t memLimit = parseInt64(extractAttribute(psxml, "MemoryLimit"));
int64_t memLimit = parseInt64(psxml.getValue("MemoryLimit"));
memoryObj["limit_bytes"] = memLimit;
std::string version;
if (tryExtractAttribute(psxml, LiteralStringRef("Version"), version)) {
if (psxml.tryGetValue("Version", version)) {
statusObj["version"] = version;
}
std::string commandLine;
if (tryExtractAttribute(psxml, LiteralStringRef("CommandLine"), commandLine)) {
if (psxml.tryGetValue("CommandLine", commandLine)) {
statusObj["command_line"] = commandLine;
}
}
@ -827,7 +718,7 @@ ACTOR static Future<StatusObject> processStatusFetcher(
// if this process address is in the machine metrics
if (mMetrics.count(address) && mMetrics[address].size()){
double availableMemory;
availableMemory = parseDouble(extractAttribute(mMetrics[address], "AvailableMemory"));
availableMemory = parseDouble(mMetrics[address].getValue("AvailableMemory"));
auto machineMemInfo = machineMemoryUsage[workerItr->first.locality.machineId()];
if (machineMemInfo.valid()) {
@ -930,8 +821,8 @@ ACTOR static Future<StatusObject> recoveryStateStatusFetcher(std::pair<WorkerInt
state StatusObject message;
try {
Standalone<StringRef> md = wait( timeoutError(mWorker.first.eventLogRequest.getReply( EventLogRequest( LiteralStringRef("MasterRecoveryState") ) ), 1.0) );
state int mStatusCode = parseInt( extractAttribute(md, LiteralStringRef("StatusCode")) );
TraceEventFields md = wait( timeoutError(mWorker.first.eventLogRequest.getReply( EventLogRequest( LiteralStringRef("MasterRecoveryState") ) ), 1.0) );
state int mStatusCode = parseInt( md.getValue("StatusCode") );
if (mStatusCode < 0 || mStatusCode >= RecoveryStatus::END)
throw attribute_not_found();
@ -939,9 +830,9 @@ ACTOR static Future<StatusObject> recoveryStateStatusFetcher(std::pair<WorkerInt
// Add additional metadata for certain statuses
if (mStatusCode == RecoveryStatus::recruiting_transaction_servers) {
int requiredLogs = atoi( extractAttribute(md, LiteralStringRef("RequiredTLogs")).c_str() );
int requiredProxies = atoi( extractAttribute(md, LiteralStringRef("RequiredProxies")).c_str() );
int requiredResolvers = atoi( extractAttribute(md, LiteralStringRef("RequiredResolvers")).c_str() );
int requiredLogs = atoi( md.getValue("RequiredTLogs").c_str() );
int requiredProxies = atoi( md.getValue("RequiredProxies").c_str() );
int requiredResolvers = atoi( md.getValue("RequiredResolvers").c_str() );
//int requiredProcesses = std::max(requiredLogs, std::max(requiredResolvers, requiredProxies));
//int requiredMachines = std::max(requiredLogs, 1);
@ -949,7 +840,7 @@ ACTOR static Future<StatusObject> recoveryStateStatusFetcher(std::pair<WorkerInt
message["required_proxies"] = requiredProxies;
message["required_resolvers"] = requiredResolvers;
} else if (mStatusCode == RecoveryStatus::locking_old_transaction_servers) {
message["missing_logs"] = extractAttribute(md, LiteralStringRef("MissingIDs")).c_str();
message["missing_logs"] = md.getValue("MissingIDs").c_str();
}
// TODO: time_in_recovery: 0.5
// time_in_state: 0.1
@ -1177,32 +1068,32 @@ ACTOR static Future<StatusObject> dataStatusFetcher(std::pair<WorkerInterface, P
state StatusObject statusObjData;
try {
std::vector<Future<Standalone<StringRef>>> futures;
std::vector<Future<TraceEventFields>> futures;
// TODO: Should this be serial?
futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(StringRef(dbName + "/DDTrackerStarting"))), 1.0));
futures.push_back(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(StringRef(dbName + "/DDTrackerStats"))), 1.0));
std::vector<Standalone<StringRef>> dataInfo = wait(getAll(futures));
std::vector<TraceEventFields> dataInfo = wait(getAll(futures));
Standalone<StringRef> startingStats = dataInfo[0];
state Standalone<StringRef> dataStats = dataInfo[1];
TraceEventFields startingStats = dataInfo[0];
state TraceEventFields dataStats = dataInfo[1];
if (startingStats.size() && extractAttribute(startingStats, LiteralStringRef("State")) != "Active") {
if (startingStats.size() && startingStats.getValue("State") != "Active") {
stateSectionObj["name"] = "initializing";
stateSectionObj["description"] = "(Re)initializing automatic data distribution";
}
else {
state Standalone<StringRef> md = wait(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(StringRef(dbName + "/MovingData"))), 1.0));
state TraceEventFields md = wait(timeoutError(mWorker.first.eventLogRequest.getReply(EventLogRequest(StringRef(dbName + "/MovingData"))), 1.0));
// If we have a MovingData message, parse it.
if (md.size())
{
int64_t partitionsInQueue = parseInt64(extractAttribute(md, LiteralStringRef("InQueue")));
int64_t partitionsInFlight = parseInt64(extractAttribute(md, LiteralStringRef("InFlight")));
int64_t averagePartitionSize = parseInt64(extractAttribute(md, LiteralStringRef("AverageShardSize")));
int64_t totalBytesWritten = parseInt64(extractAttribute(md, LiteralStringRef("BytesWritten")));
int highestPriority = parseInt(extractAttribute(md, LiteralStringRef("HighestPriority")));
int64_t partitionsInQueue = parseInt64(md.getValue("InQueue"));
int64_t partitionsInFlight = parseInt64(md.getValue("InFlight"));
int64_t averagePartitionSize = parseInt64(md.getValue("AverageShardSize"));
int64_t totalBytesWritten = parseInt64(md.getValue("BytesWritten"));
int highestPriority = parseInt(md.getValue("HighestPriority"));
if( averagePartitionSize >= 0 ) {
StatusObject moving_data;
@ -1265,9 +1156,9 @@ ACTOR static Future<StatusObject> dataStatusFetcher(std::pair<WorkerInterface, P
if (dataStats.size())
{
int64_t totalDBBytes = parseInt64(extractAttribute(dataStats, LiteralStringRef("TotalSizeBytes")));
int64_t totalDBBytes = parseInt64(dataStats.getValue("TotalSizeBytes"));
statusObjData["total_kv_size_bytes"] = totalDBBytes;
int shards = parseInt(extractAttribute(dataStats, LiteralStringRef("Shards")));
int shards = parseInt(dataStats.getValue("Shards"));
statusObjData["partitions_count"] = shards;
}
@ -1299,30 +1190,30 @@ namespace std
}
ACTOR template <class iface>
static Future<vector<std::pair<iface, std::string>>> getServerMetrics(vector<iface> servers, std::unordered_map<NetworkAddress, WorkerInterface> address_workers, std::string suffix) {
state vector<Future<Optional<std::string>>> futures;
static Future<vector<std::pair<iface, TraceEventFields>>> getServerMetrics(vector<iface> servers, std::unordered_map<NetworkAddress, WorkerInterface> address_workers, std::string suffix) {
state vector<Future<Optional<TraceEventFields>>> futures;
for (auto s : servers) {
futures.push_back(latestEventOnWorker(address_workers[s.address()], s.id().toString() + suffix));
}
Void _ = wait(waitForAll(futures));
vector<std::pair<iface, std::string>> results;
vector<std::pair<iface, TraceEventFields>> results;
for (int i = 0; i < servers.size(); i++) {
results.push_back(std::make_pair(servers[i], futures[i].get().present() ? futures[i].get().get() : ""));
results.push_back(std::make_pair(servers[i], futures[i].get().present() ? futures[i].get().get() : TraceEventFields()));
}
return results;
}
ACTOR static Future<vector<std::pair<StorageServerInterface, std::string>>> getStorageServersAndMetrics(Database cx, std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
ACTOR static Future<vector<std::pair<StorageServerInterface, TraceEventFields>>> getStorageServersAndMetrics(Database cx, std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
vector<StorageServerInterface> servers = wait(timeoutError(getStorageServers(cx, true), 5.0));
vector<std::pair<StorageServerInterface, std::string>> results = wait(getServerMetrics(servers, address_workers, "/StorageMetrics"));
vector<std::pair<StorageServerInterface, TraceEventFields>> results = wait(getServerMetrics(servers, address_workers, "/StorageMetrics"));
return results;
}
ACTOR static Future<vector<std::pair<TLogInterface, std::string>>> getTLogsAndMetrics(Reference<AsyncVar<struct ServerDBInfo>> db, std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
ACTOR static Future<vector<std::pair<TLogInterface, TraceEventFields>>> getTLogsAndMetrics(Reference<AsyncVar<struct ServerDBInfo>> db, std::unordered_map<NetworkAddress, WorkerInterface> address_workers) {
vector<TLogInterface> servers = db->get().logSystemConfig.allPresentLogs();
vector<std::pair<TLogInterface, std::string>> results = wait(getServerMetrics(servers, address_workers, "/TLogMetrics"));
vector<std::pair<TLogInterface, TraceEventFields>> results = wait(getServerMetrics(servers, address_workers, "/TLogMetrics"));
return results;
}
@ -1358,7 +1249,7 @@ static int getExtraTLogEligibleMachines(vector<std::pair<WorkerInterface, Proces
}
ACTOR static Future<StatusObject> workloadStatusFetcher(Reference<AsyncVar<struct ServerDBInfo>> db, vector<std::pair<WorkerInterface, ProcessClass>> workers, std::pair<WorkerInterface, ProcessClass> mWorker,
std::string dbName, StatusObject *qos, StatusObject *data_overlay, std::set<std::string> *incomplete_reasons, Future<ErrorOr<vector<std::pair<StorageServerInterface, std::string>>>> storageServerFuture)
std::string dbName, StatusObject *qos, StatusObject *data_overlay, std::set<std::string> *incomplete_reasons, Future<ErrorOr<vector<std::pair<StorageServerInterface, TraceEventFields>>>> storageServerFuture)
{
state StatusObject statusObj;
state StatusObject operationsObj;
@ -1367,7 +1258,7 @@ ACTOR static Future<StatusObject> workloadStatusFetcher(Reference<AsyncVar<struc
// Writes and conflicts
try {
vector<Future<Standalone<StringRef>>> proxyStatFutures;
vector<Future<TraceEventFields>> proxyStatFutures;
std::map<NetworkAddress, std::pair<WorkerInterface, ProcessClass>> workersMap;
for (auto w : workers) {
workersMap[w.first.address()] = w;
@ -1379,16 +1270,16 @@ ACTOR static Future<StatusObject> workloadStatusFetcher(Reference<AsyncVar<struc
else
throw all_alternatives_failed(); // We need data from all proxies for this result to be trustworthy
}
vector<Standalone<StringRef>> proxyStats = wait(getAll(proxyStatFutures));
vector<TraceEventFields> proxyStats = wait(getAll(proxyStatFutures));
StatusObject mutations=makeCounter(), mutationBytes=makeCounter(), txnConflicts=makeCounter(), txnStartOut=makeCounter(), txnCommitOutSuccess=makeCounter();
for (auto &ps : proxyStats) {
mutations = addCounters( mutations, parseCounter(extractAttribute(ps, LiteralStringRef("Mutations"))) );
mutationBytes = addCounters( mutationBytes, parseCounter(extractAttribute(ps, LiteralStringRef("MutationBytes"))) );
txnConflicts = addCounters( txnConflicts, parseCounter(extractAttribute(ps, LiteralStringRef("TxnConflicts"))) );
txnStartOut = addCounters( txnStartOut, parseCounter(extractAttribute(ps, LiteralStringRef("TxnStartOut"))) );
txnCommitOutSuccess = addCounters( txnCommitOutSuccess, parseCounter(extractAttribute(ps, LiteralStringRef("TxnCommitOutSuccess"))) );
mutations = addCounters( mutations, parseCounter(ps.getValue("Mutations")) );
mutationBytes = addCounters( mutationBytes, parseCounter(ps.getValue("MutationBytes")) );
txnConflicts = addCounters( txnConflicts, parseCounter(ps.getValue("TxnConflicts")) );
txnStartOut = addCounters( txnStartOut, parseCounter(ps.getValue("TxnStartOut")) );
txnCommitOutSuccess = addCounters( txnCommitOutSuccess, parseCounter(ps.getValue("TxnCommitOutSuccess")) );
}
operationsObj["writes"] = mutations;
@ -1409,19 +1300,19 @@ ACTOR static Future<StatusObject> workloadStatusFetcher(Reference<AsyncVar<struc
// Transactions
try {
Standalone<StringRef> md = wait( timeoutError(mWorker.first.eventLogRequest.getReply( EventLogRequest(StringRef(dbName+"/RkUpdate") ) ), 1.0) );
double tpsLimit = parseDouble(extractAttribute(md, LiteralStringRef("TPSLimit")));
double transPerSec = parseDouble(extractAttribute(md, LiteralStringRef("ReleasedTPS")));
int ssCount = parseInt(extractAttribute(md, LiteralStringRef("StorageServers")));
int tlogCount = parseInt(extractAttribute(md, LiteralStringRef("TLogs")));
int64_t worstFreeSpaceStorageServer = parseInt64(extractAttribute(md, LiteralStringRef("WorstFreeSpaceStorageServer")));
int64_t worstFreeSpaceTLog = parseInt64(extractAttribute(md, LiteralStringRef("WorstFreeSpaceTLog")));
int64_t worstStorageServerQueue = parseInt64(extractAttribute(md, LiteralStringRef("WorstStorageServerQueue")));
int64_t limitingStorageServerQueue = parseInt64(extractAttribute(md, LiteralStringRef("LimitingStorageServerQueue")));
int64_t worstTLogQueue = parseInt64(extractAttribute(md, LiteralStringRef("WorstTLogQueue")));
int64_t totalDiskUsageBytes = parseInt64(extractAttribute(md, LiteralStringRef("TotalDiskUsageBytes")));
int64_t worstVersionLag = parseInt64(extractAttribute(md, LiteralStringRef("WorstStorageServerVersionLag")));
int64_t limitingVersionLag = parseInt64(extractAttribute(md, LiteralStringRef("LimitingStorageServerVersionLag")));
TraceEventFields md = wait( timeoutError(mWorker.first.eventLogRequest.getReply( EventLogRequest(StringRef(dbName+"/RkUpdate") ) ), 1.0) );
double tpsLimit = parseDouble(md.getValue("TPSLimit"));
double transPerSec = parseDouble(md.getValue("ReleasedTPS"));
int ssCount = parseInt(md.getValue("StorageServers"));
int tlogCount = parseInt(md.getValue("TLogs"));
int64_t worstFreeSpaceStorageServer = parseInt64(md.getValue("WorstFreeSpaceStorageServer"));
int64_t worstFreeSpaceTLog = parseInt64(md.getValue("WorstFreeSpaceTLog"));
int64_t worstStorageServerQueue = parseInt64(md.getValue("WorstStorageServerQueue"));
int64_t limitingStorageServerQueue = parseInt64(md.getValue("LimitingStorageServerQueue"));
int64_t worstTLogQueue = parseInt64(md.getValue("WorstTLogQueue"));
int64_t totalDiskUsageBytes = parseInt64(md.getValue("TotalDiskUsageBytes"));
int64_t worstVersionLag = parseInt64(md.getValue("WorstStorageServerVersionLag"));
int64_t limitingVersionLag = parseInt64(md.getValue("LimitingStorageServerVersionLag"));
(*data_overlay)["total_disk_used_bytes"] = totalDiskUsageBytes;
if(ssCount > 0) {
@ -1440,13 +1331,13 @@ ACTOR static Future<StatusObject> workloadStatusFetcher(Reference<AsyncVar<struc
(*qos)["transactions_per_second_limit"] = tpsLimit;
(*qos)["released_transactions_per_second"] = transPerSec;
int reason = parseInt(extractAttribute(md, LiteralStringRef("Reason")));
int reason = parseInt(md.getValue("Reason"));
StatusObject perfLimit;
if (transPerSec > tpsLimit * 0.8) {
// If reason is known, set qos.performance_limited_by, otherwise omit
if (reason >= 0 && reason < limitReasonEnd) {
perfLimit = makeMessage(limitReasonName[reason], limitReasonDesc[reason]);
std::string reason_server_id = extractAttribute(md, LiteralStringRef("ReasonServerID"));
std::string reason_server_id = md.getValue("ReasonServerID");
if (!reason_server_id.empty())
perfLimit["reason_server_id"] = reason_server_id;
}
@ -1467,7 +1358,7 @@ ACTOR static Future<StatusObject> workloadStatusFetcher(Reference<AsyncVar<struc
// Reads
try {
ErrorOr<vector<std::pair<StorageServerInterface, std::string>>> storageServers = wait(storageServerFuture);
ErrorOr<vector<std::pair<StorageServerInterface, TraceEventFields>>> storageServers = wait(storageServerFuture);
if(!storageServers.present()) {
throw storageServers.getError();
}
@ -1477,9 +1368,9 @@ ACTOR static Future<StatusObject> workloadStatusFetcher(Reference<AsyncVar<struc
StatusObject readBytes = makeCounter();
for(auto &ss : storageServers.get()) {
reads = addCounters(reads, parseCounter(extractAttribute(ss.second, LiteralStringRef("FinishedQueries"))));
readKeys = addCounters(readKeys, parseCounter(extractAttribute(ss.second, LiteralStringRef("RowsQueried"))));
readBytes = addCounters(readBytes, parseCounter(extractAttribute(ss.second, LiteralStringRef("BytesQueried"))));
reads = addCounters(reads, parseCounter(ss.second.getValue("FinishedQueries")));
readKeys = addCounters(readKeys, parseCounter(ss.second.getValue("RowsQueried")));
readBytes = addCounters(readBytes, parseCounter(ss.second.getValue("BytesQueried")));
}
operationsObj["reads"] = reads;
@ -1839,8 +1730,8 @@ ACTOR Future<StatusReply> clusterGetStatus(
}
state std::map<std::string, StatusObject> processIssues = getProcessIssuesAsMessages(workerIssues);
state vector<std::pair<StorageServerInterface, std::string>> storageServers;
state vector<std::pair<TLogInterface, std::string>> tLogs;
state vector<std::pair<StorageServerInterface, TraceEventFields>> storageServers;
state vector<std::pair<TLogInterface, TraceEventFields>> tLogs;
state StatusObject qos;
state StatusObject data_overlay;
@ -1875,8 +1766,8 @@ ACTOR Future<StatusReply> clusterGetStatus(
state std::unordered_map<NetworkAddress, WorkerInterface> address_workers;
for (auto worker : workers)
address_workers[worker.first.address()] = worker.first;
state Future<ErrorOr<vector<std::pair<StorageServerInterface, std::string>>>> storageServerFuture = errorOr(getStorageServersAndMetrics(cx, address_workers));
state Future<ErrorOr<vector<std::pair<TLogInterface, std::string>>>> tLogFuture = errorOr(getTLogsAndMetrics(db, address_workers));
state Future<ErrorOr<vector<std::pair<StorageServerInterface, TraceEventFields>>>> storageServerFuture = errorOr(getStorageServersAndMetrics(cx, address_workers));
state Future<ErrorOr<vector<std::pair<TLogInterface, TraceEventFields>>>> tLogFuture = errorOr(getTLogsAndMetrics(db, address_workers));
state int minReplicasRemaining = -1;
std::vector<Future<StatusObject>> futures2;
@ -1927,7 +1818,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
}
// Need storage servers now for processStatusFetcher() below.
ErrorOr<vector<std::pair<StorageServerInterface, std::string>>> _storageServers = wait(storageServerFuture);
ErrorOr<vector<std::pair<StorageServerInterface, TraceEventFields>>> _storageServers = wait(storageServerFuture);
if (_storageServers.present()) {
storageServers = _storageServers.get();
}
@ -1935,7 +1826,7 @@ ACTOR Future<StatusReply> clusterGetStatus(
messages.push_back(makeMessage("storage_servers_error", "Timed out trying to retrieve storage servers."));
// ...also tlogs
ErrorOr<vector<std::pair<TLogInterface, std::string>>> _tLogs = wait(tLogFuture);
ErrorOr<vector<std::pair<TLogInterface, TraceEventFields>>> _tLogs = wait(tLogFuture);
if (_tLogs.present()) {
tLogs = _tLogs.get();
}

View File

@ -30,7 +30,6 @@
typedef std::map< NetworkAddress, std::pair<std::string,UID> > ProcessIssuesMap;
typedef std::map< NetworkAddress, Standalone<VectorRef<ClientVersionRef>> > ClientVersionMap;
std::string extractAttribute( std::string const& expanded, std::string const& attributeToExtract );
Future<StatusReply> clusterGetStatus( Reference<AsyncVar<struct ServerDBInfo>> const& db, Database const& cx, vector<std::pair<WorkerInterface, ProcessClass>> const& workers,
ProcessIssuesMap const& workerIssues, ProcessIssuesMap const& clientIssues, ClientVersionMap const& clientVersionMap, std::map<NetworkAddress, std::string> const& traceLogGroupMap,
ServerCoordinators const& coordinators, std::vector<NetworkAddress> const& incompatibleConnections, Version const& datacenterVersionDifference );

View File

@ -227,7 +227,7 @@ struct SetMetricsLogRateRequest {
struct EventLogRequest {
bool getLastError;
Standalone<StringRef> eventName;
ReplyPromise< Standalone<StringRef> > reply;
ReplyPromise< TraceEventFields > reply;
EventLogRequest() : getLastError(true) {}
explicit EventLogRequest( Standalone<StringRef> eventName ) : eventName( eventName ), getLastError( false ) {}

View File

@ -811,11 +811,11 @@ ACTOR Future<Void> workerServer( Reference<ClusterConnectionFile> connFile, Refe
}
}
when( EventLogRequest req = waitNext(interf.eventLogRequest.getFuture()) ) {
Standalone<StringRef> e;
TraceEventFields e;
if( req.getLastError )
e = StringRef( latestEventCache.getLatestError() );
e = latestEventCache.getLatestError();
else
e = StringRef( latestEventCache.get( req.eventName.toString() ) );
e = latestEventCache.get( req.eventName.toString() );
req.reply.send(e);
}
when( TraceBatchDumpRequest req = waitNext(interf.traceBatchDumpRequest.getFuture()) ) {

View File

@ -41,10 +41,10 @@ struct DDMetricsWorkload : TestWorkload {
WorkerInterface masterWorker = wait(getMasterWorker(cx, self->dbInfo));
TraceEvent("GetHighPriorityReliocationsInFlight").detail("Database", printable(cx->dbName)).detail("Stage", "ContactingMaster");
Standalone<StringRef> md = wait( timeoutError(masterWorker.eventLogRequest.getReply(
TraceEventFields md = wait( timeoutError(masterWorker.eventLogRequest.getReply(
EventLogRequest( StringRef( cx->dbName.toString() + "/MovingData" ) ) ), 1.0 ) );
int relocations;
sscanf(extractAttribute(md.toString(), "HighPriorityRelocations").c_str(), "%d", &relocations);
sscanf(md.getValue("HighPriorityRelocations").c_str(), "%d", &relocations);
return relocations;
}

View File

@ -42,17 +42,17 @@ struct WorkerErrorsWorkload : TestWorkload {
virtual void getMetrics( vector<PerfMetric>& m ) {}
ACTOR Future< std::vector< std::string > > latestEventOnWorkers( std::vector<std::pair<WorkerInterface, ProcessClass>> workers ) {
state vector<Future<Standalone<StringRef>>> eventTraces;
ACTOR Future< std::vector< TraceEventFields > > latestEventOnWorkers( std::vector<std::pair<WorkerInterface, ProcessClass>> workers ) {
state vector<Future<TraceEventFields>> eventTraces;
for(int c = 0; c < workers.size(); c++) {
eventTraces.push_back( workers[c].first.eventLogRequest.getReply( EventLogRequest() ) );
}
Void _ = wait( timeoutError( waitForAll( eventTraces ), 2.0 ) );
vector<std::string> results;
vector<TraceEventFields> results;
for(int i = 0; i < eventTraces.size(); i++) {
results.push_back( eventTraces[i].get().toString() );
results.push_back( eventTraces[i].get() );
}
return results;
@ -60,9 +60,9 @@ struct WorkerErrorsWorkload : TestWorkload {
ACTOR Future<Void> _start(Database cx, WorkerErrorsWorkload *self) {
state vector<std::pair<WorkerInterface, ProcessClass>> workers = wait( getWorkers( self->dbInfo ) );
std::vector<std::string> errors = wait( self->latestEventOnWorkers( workers ) );
std::vector<TraceEventFields> errors = wait( self->latestEventOnWorkers( workers ) );
for(auto e : errors) {
printf("%s\n", e.c_str());
printf("%s\n", e.toString().c_str());
}
return Void();
}

187
flow/FileTraceLogWriter.cpp Normal file
View File

@ -0,0 +1,187 @@
/*
* FileTraceLogWriter.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "FileTraceLogWriter.h"
#include "flow.h"
#include "ThreadHelper.actor.h"
#if defined(__unixish__)
#define __open ::open
#define __write ::write
#define __close ::close
#define __fsync ::fsync
#define TRACEFILE_FLAGS O_WRONLY | O_CREAT | O_EXCL
#define TRACEFILE_MODE 0664
#elif defined(_WIN32)
#include <windows.h>
#undef max
#undef min
#include <io.h>
#include <stdio.h>
#include <sys/stat.h>
#define __open _open
#define __write _write
#define __close _close
#define __fsync _commit
#define TRACEFILE_FLAGS _O_WRONLY | _O_CREAT | _O_EXCL
#define TRACEFILE_MODE _S_IWRITE
#endif
#include <fcntl.h>
FileTraceLogWriter::FileTraceLogWriter(std::string directory, std::string processName, std::string basename, std::string extension, uint64_t maxLogsSize, std::function<void()> onError)
: directory(directory), processName(processName), basename(basename), extension(extension), maxLogsSize(maxLogsSize), traceFileFD(-1), index(0), onError(onError) {}
void FileTraceLogWriter::addref() {
ReferenceCounted<FileTraceLogWriter>::addref();
}
void FileTraceLogWriter::delref() {
ReferenceCounted<FileTraceLogWriter>::delref();
}
void FileTraceLogWriter::lastError(int err) {
// Whenever we get a serious error writing a trace log, all flush barriers posted between the operation encountering
// the error and the occurrence of the error are unblocked, even though we haven't actually succeeded in flushing.
// Otherwise a permanent write error would make the program block forever.
if (err != 0 && err != EINTR) {
onError();
}
}
void FileTraceLogWriter::write(const std::string& str) {
auto ptr = str.c_str();
int remaining = str.size();
while ( remaining ) {
int ret = __write( traceFileFD, ptr, remaining );
if ( ret > 0 ) {
lastError(0);
remaining -= ret;
ptr += ret;
} else {
lastError(errno);
threadSleep(0.1);
}
}
}
void FileTraceLogWriter::open() {
cleanupTraceFiles();
auto finalname = format("%s.%d.%s", basename.c_str(), ++index, extension.c_str());
while ( (traceFileFD = __open( finalname.c_str(), TRACEFILE_FLAGS, TRACEFILE_MODE )) == -1 ) {
lastError(errno);
if (errno == EEXIST)
finalname = format("%s.%d.%s", basename.c_str(), ++index, extension.c_str());
else {
fprintf(stderr, "ERROR: could not create trace log file `%s' (%d: %s)\n", finalname.c_str(), errno, strerror(errno));
int errorNum = errno;
onMainThreadVoid([finalname, errorNum]{
TraceEvent(SevWarnAlways, "TraceFileOpenError")
.detail("Filename", finalname)
.detail("ErrorCode", errorNum)
.detail("Error", strerror(errorNum))
.trackLatest("TraceFileOpenError"); }, NULL);
threadSleep(FLOW_KNOBS->TRACE_RETRY_OPEN_INTERVAL);
}
}
onMainThreadVoid([]{ latestEventCache.clear("TraceFileOpenError"); }, NULL);
lastError(0);
}
void FileTraceLogWriter::close() {
if (traceFileFD >= 0) {
while ( __close(traceFileFD) ) threadSleep(0.1);
}
}
void FileTraceLogWriter::roll() {
close();
open();
}
void FileTraceLogWriter::sync() {
__fsync(traceFileFD);
}
void FileTraceLogWriter::extractTraceFileNameInfo(std::string const& filename, std::string &root, int &index) {
int split = filename.find_last_of('.', filename.size() - 5);
root = filename.substr(0, split);
if(sscanf(filename.substr(split + 1, filename.size() - split - 4).c_str(), "%d", &index) == EOF) {
index = -1;
}
}
bool FileTraceLogWriter::compareTraceFileName (std::string const& f1, std::string const& f2) {
std::string root1;
std::string root2;
int index1;
int index2;
extractTraceFileNameInfo(f1, root1, index1);
extractTraceFileNameInfo(f2, root2, index2);
if(root1 != root2)
return root1 < root2;
if(index1 != index2)
return index1 < index2;
return f1 < f2;
}
bool FileTraceLogWriter::reverseCompareTraceFileName(std::string f1, std::string f2) {
return compareTraceFileName(f2, f1);
}
void FileTraceLogWriter::cleanupTraceFiles() {
// Setting maxLogsSize=0 disables trace file cleanup based on dir size
if(!g_network->isSimulated() && maxLogsSize > 0) {
try {
std::vector<std::string> existingFiles = platform::listFiles(directory, extension);
std::vector<std::string> existingTraceFiles;
for(auto f = existingFiles.begin(); f != existingFiles.end(); ++f) {
if(f->substr(0, processName.length()) == processName) {
existingTraceFiles.push_back(*f);
}
}
// reverse sort, so we preserve the most recent files and delete the oldest
std::sort(existingTraceFiles.begin(), existingTraceFiles.end(), FileTraceLogWriter::reverseCompareTraceFileName);
int64_t runningTotal = 0;
std::vector<std::string>::iterator fileListIterator = existingTraceFiles.begin();
while(runningTotal < maxLogsSize && fileListIterator != existingTraceFiles.end()) {
runningTotal += (fileSize(joinPath(directory, *fileListIterator)) + FLOW_KNOBS->ZERO_LENGTH_FILE_PAD);
++fileListIterator;
}
while(fileListIterator != existingTraceFiles.end()) {
deleteFile(joinPath(directory, *fileListIterator));
++fileListIterator;
}
} catch( Error & ) {}
}
}

63
flow/FileTraceLogWriter.h Normal file
View File

@ -0,0 +1,63 @@
/*
* FileTraceLogWriter.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FLOW_FILE_TRACE_LOG_WRITER_H
#define FLOW_FILE_TRACE_LOG_WRITER_H
#pragma once
#include "FastRef.h"
#include "Trace.h"
class FileTraceLogWriter : public ITraceLogWriter, ReferenceCounted<FileTraceLogWriter> {
private:
std::string directory;
std::string processName;
std::string basename;
std::string extension;
uint64_t maxLogsSize;
int traceFileFD;
int index;
std::function<void()> onError;
public:
FileTraceLogWriter(std::string directory, std::string processName, std::string basename, std::string extension, uint64_t maxLogsSize, std::function<void()> onError);
void addref();
void delref();
void lastError(int err);
void write(const std::string& str);
void open();
void close();
void roll();
void sync();
static void extractTraceFileNameInfo(std::string const& filename, std::string &root, int &index);
static bool compareTraceFileName (std::string const& f1, std::string const& f2);
static bool reverseCompareTraceFileName(std::string f1, std::string f2);
void cleanupTraceFiles();
};
#endif

View File

@ -115,10 +115,12 @@ FlowKnobs::FlowKnobs(bool randomize, bool isSimulated) {
init( TRACE_RETRY_OPEN_INTERVAL, 1.00 );
init( MIN_TRACE_SEVERITY, isSimulated ? 0 : 10 ); // Related to the trace severity in Trace.h
init( MAX_TRACE_SUPPRESSIONS, 1e4 );
init( TRACE_FSYNC_ENABLED, 0 );
init( TRACE_EVENT_METRIC_UNITS_PER_SAMPLE, 500 );
init( TRACE_EVENT_THROTLLER_SAMPLE_EXPIRY, 1800.0 ); // 30 mins
init( TRACE_EVENT_THROTTLER_MSG_LIMIT, 20000 );
init( TRACE_SYNC_ENABLED, 0 );
init( TRACE_EVENT_METRIC_UNITS_PER_SAMPLE, 500 );
init( TRACE_EVENT_THROTTLER_SAMPLE_EXPIRY, 1800.0 ); // 30 mins
init( TRACE_EVENT_THROTTLER_MSG_LIMIT, 20000 );
init( TRACE_EVENT_MAX_SIZE, 4000 );
init( TRACE_LOG_MAX_PREOPEN_BUFFER, 1000000 );
//TDMetrics
init( MAX_METRICS, 600 );

View File

@ -137,10 +137,12 @@ public:
double TRACE_RETRY_OPEN_INTERVAL;
int MIN_TRACE_SEVERITY;
int MAX_TRACE_SUPPRESSIONS;
int TRACE_FSYNC_ENABLED;
int TRACE_SYNC_ENABLED;
int TRACE_EVENT_METRIC_UNITS_PER_SAMPLE;
int TRACE_EVENT_THROTLLER_SAMPLE_EXPIRY;
int TRACE_EVENT_THROTTLER_SAMPLE_EXPIRY;
int TRACE_EVENT_THROTTLER_MSG_LIMIT;
int TRACE_EVENT_MAX_SIZE;
int TRACE_LOG_MAX_PREOPEN_BUFFER;
//TDMetrics
int64_t MAX_METRIC_SIZE;

View File

@ -2317,6 +2317,7 @@ extern void flushTraceFileVoid();
extern "C" void flushAndExit(int exitCode) {
flushTraceFileVoid();
fflush(stdout);
closeTraceFile();
#ifdef _WIN32
// This function is documented as being asynchronous, but we suspect it might actually be synchronous in the
// case that it is passed a handle to the current process. If not, then there may be cases where we escalate

File diff suppressed because it is too large Load Diff

View File

@ -52,6 +52,55 @@ enum Severity {
SevMax=1000000
};
class TraceEventFields {
public:
typedef std::pair<std::string, std::string> Field;
typedef std::vector<Field> FieldContainer;
typedef FieldContainer::const_iterator FieldIterator;
TraceEventFields();
size_t size() const;
size_t sizeBytes() const;
FieldIterator begin() const;
FieldIterator end() const;
void addField(const std::string& key, const std::string& value);
void addField(std::string&& key, std::string&& value);
const Field &operator[] (int index) const;
bool tryGetValue(std::string key, std::string &outValue) const;
std::string getValue(std::string key) const;
std::string toString() const;
void validateFormat() const;
private:
FieldContainer fields;
size_t bytes;
};
template <class Archive>
inline void load( Archive& ar, TraceEventFields& value ) {
uint32_t count;
ar >> count;
std::string k;
std::string v;
for(uint32_t i = 0; i < count; ++i) {
ar >> k >> v;
value.addField(k, v);
}
}
template <class Archive>
inline void save( Archive& ar, const TraceEventFields& value ) {
ar << (uint32_t)value.size();
for(auto itr : value) {
ar << itr.first << itr.second;
}
}
class TraceBatch {
public:
void addEvent( const char *name, uint64_t id, const char *location );
@ -61,30 +110,18 @@ public:
private:
struct EventInfo {
double time;
const char *name;
uint64_t id;
const char *location;
EventInfo(double time, const char *name, uint64_t id, const char *location) : time(time), name(name), id(id), location(location) {}
TraceEventFields fields;
EventInfo(double time, const char *name, uint64_t id, const char *location);
};
struct AttachInfo {
double time;
const char *name;
uint64_t id;
uint64_t to;
AttachInfo(double time, const char *name, uint64_t id, uint64_t to) : time(time), name(name), id(id), to(to) {}
TraceEventFields fields;
AttachInfo(double time, const char *name, uint64_t id, uint64_t to);
};
struct BuggifyInfo {
double time;
int activated;
int line;
std::string file;
BuggifyInfo(double time, int activated, int line, std::string file) : time(time), activated(activated), line(line), file(file) {}
TraceEventFields fields;
BuggifyInfo(double time, int activated, int line, std::string file);
};
std::vector<EventInfo> eventBatch;
@ -97,7 +134,6 @@ class StringRef;
template <class T> class Standalone;
template <class T> class Optional;
#if 1
struct TraceEvent {
TraceEvent( const char* type, UID id = UID() ); // Assumes SevInfo severity
TraceEvent( Severity, const char* type, UID id = UID() );
@ -112,29 +148,28 @@ struct TraceEvent {
static void setNetworkThread();
static bool isNetworkThread();
TraceEvent& error(class Error const& e, bool includeCancelled=false);
TraceEvent& error(const class Error& e, bool includeCancelled=false);
TraceEvent& detail( const char* key, const char* value );
TraceEvent& detail( const char* key, const std::string& value );
TraceEvent& detail( const char* key, double value );
TraceEvent& detail( const char* key, long int value );
TraceEvent& detail( const char* key, long unsigned int value );
TraceEvent& detail( const char* key, long long int value );
TraceEvent& detail( const char* key, long long unsigned int value );
TraceEvent& detail( const char* key, int value );
TraceEvent& detail( const char* key, unsigned value );
TraceEvent& detail( const char* key, struct NetworkAddress const& value );
TraceEvent& detailf( const char* key, const char* valueFormat, ... );
TraceEvent& detailext(const char* key, StringRef const& value);
TraceEvent& detailext(const char* key, Optional<Standalone<StringRef>> const& value);
TraceEvent& detail( std::string key, std::string value );
TraceEvent& detail( std::string key, double value );
TraceEvent& detail( std::string key, long int value );
TraceEvent& detail( std::string key, long unsigned int value );
TraceEvent& detail( std::string key, long long int value );
TraceEvent& detail( std::string key, long long unsigned int value );
TraceEvent& detail( std::string key, int value );
TraceEvent& detail( std::string key, unsigned value );
TraceEvent& detail( std::string key, const struct NetworkAddress& value );
TraceEvent& detailf( std::string key, const char* valueFormat, ... );
TraceEvent& detailext( std::string key, const StringRef& value );
TraceEvent& detailext( std::string key, const Optional<Standalone<StringRef>>& value );
private:
// Private version of _detailf that does NOT write to the eventMetric. This is to be used by other detail methods
// Private version of detailf that does NOT write to the eventMetric. This is to be used by other detail methods
// which can write field metrics of a more appropriate type than string but use detailf() to add to the TraceEvent.
TraceEvent& _detailf( const char* key, const char* valueFormat, ... );
TraceEvent& detailfNoMetric( std::string&& key, const char* valueFormat, ... );
TraceEvent& detailImpl( std::string&& key, std::string&& value, bool writeEventMetricField=true );
public:
TraceEvent& detailfv( const char* key, const char* valueFormat, va_list args, bool writeEventMetricField);
TraceEvent& detail( const char* key, UID const& value );
TraceEvent& backtrace(std::string prefix = "");
TraceEvent& detail( std::string key, const UID& value );
TraceEvent& backtrace(const std::string& prefix = "");
TraceEvent& trackLatest( const char* trackingKey );
TraceEvent& sample( double sampleRate, bool logSampleRate=true );
TraceEvent& suppressFor( double duration, bool logSuppressedEventCount=true );
@ -151,7 +186,7 @@ public:
private:
bool enabled;
std::string trackingKey;
char buffer[4000];
TraceEventFields fields;
int length;
Severity severity;
const char *type;
@ -162,45 +197,28 @@ private:
bool init( Severity, const char* type );
bool init( Severity, struct TraceInterval& );
void write( int length, const void* data );
void writef( const char* format, ... );
void writeEscaped( const char* data );
void writeEscapedfv( const char* format, va_list args );
};
#else
struct TraceEvent {
TraceEvent(const char* type, UID id = UID()) {}
TraceEvent(Severity, const char* type, UID id = UID()) {}
TraceEvent(struct TraceInterval&, UID id = UID()) {}
TraceEvent(const char* type, StringRef& const id); {} // Assumes SevInfo severity
TraceEvent(Severity, const char* type, StringRef& const id); {}
static bool isEnabled(const char* type) { return false; }
struct ITraceLogWriter {
virtual void open() = 0;
virtual void roll() = 0;
virtual void close() = 0;
virtual void write(const std::string&) = 0;
virtual void sync() = 0;
TraceEvent& error(class Error const& e, bool includeCancelled = false) { return *this; }
TraceEvent& detail(const char* key, const char* value) { return *this; }
TraceEvent& detail(const char* key, const std::string& value) { return *this; }
TraceEvent& detail(const char* key, double value) { return *this; }
TraceEvent& detail(const char* key, long int value) { return *this; }
TraceEvent& detail(const char* key, long unsigned int value) { return *this; }
TraceEvent& detail(const char* key, long long int value) { return *this; }
TraceEvent& detail(const char* key, long long unsigned int value) { return *this; }
TraceEvent& detail(const char* key, int value) { return *this; }
TraceEvent& detail(const char* key, unsigned value) { return *this; }
TraceEvent& detail(const char* key, struct NetworkAddress const& value) { return *this; }
TraceEvent& detailf(const char* key, const char* valueFormat, ...) { return *this; }
TraceEvent& detailfv(const char* key, const char* valueFormat, va_list args) { return *this; }
TraceEvent& detail(const char* key, UID const& value) { return *this; }
TraceEvent& detailext(const char* key, StringRef const& value) { return *this; }
TraceEvent& detailext(const char* key, Optional<Standalone<StringRef>> const& value); { return *this; }
TraceEvent& backtrace(std::string prefix = "") { return *this; }
TraceEvent& trackLatest(const char* trackingKey) { return *this; }
TraceEvent& GetLastError() { return *this; }
virtual void addref() = 0;
virtual void delref() = 0;
};
struct ITraceLogFormatter {
virtual const char* getExtension() = 0;
virtual const char* getHeader() = 0; // Called when starting a new file
virtual const char* getFooter() = 0; // Called when ending a file
virtual std::string formatEvent(const TraceEventFields&) = 0; // Called for each event
virtual void addref() = 0;
virtual void delref() = 0;
};
#endif
struct TraceInterval {
TraceInterval( const char* type ) : count(-1), type(type), severity(SevInfo) {}
@ -216,20 +234,20 @@ struct TraceInterval {
struct LatestEventCache {
public:
void set( std::string tag, std::string contents );
std::string get( std::string tag );
std::vector<std::string> getAll();
std::vector<std::string> getAllUnsafe();
void set( std::string tag, const TraceEventFields& fields );
TraceEventFields get( std::string tag );
std::vector<TraceEventFields> getAll();
std::vector<TraceEventFields> getAllUnsafe();
void clear( std::string prefix );
void clear();
// Latest error tracking only tracks errors when called from the main thread. Other errors are silently ignored.
void setLatestError( std::string contents );
std::string getLatestError();
void setLatestError( const TraceEventFields& contents );
TraceEventFields getLatestError();
private:
std::map<NetworkAddress, std::map<std::string, std::string>> latest;
std::map<NetworkAddress, std::string> latestErrors;
std::map<NetworkAddress, std::map<std::string, TraceEventFields>> latest;
std::map<NetworkAddress, TraceEventFields> latestErrors;
};
extern LatestEventCache latestEventCache;

View File

@ -0,0 +1,95 @@
/*
* XmlTraceLogFormatter.cpp
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "actorcompiler.h"
#include "XmlTraceLogFormatter.h"
void XmlTraceLogFormatter::addref() {
ReferenceCounted<XmlTraceLogFormatter>::addref();
}
void XmlTraceLogFormatter::delref() {
ReferenceCounted<XmlTraceLogFormatter>::delref();
}
const char* XmlTraceLogFormatter::getExtension() {
return "xml";
}
const char* XmlTraceLogFormatter::getHeader() {
return "<?xml version=\"1.0\"?>\r\n<Trace>\r\n";
}
const char* XmlTraceLogFormatter::getFooter() {
return "</Trace>\r\n";
}
void XmlTraceLogFormatter::escape(std::stringstream &ss, std::string source) {
loop {
int index = source.find_first_of(std::string({'&', '"', '<', '>', '\r', '\n', '\0'}));
if(index == source.npos) {
break;
}
ss << source.substr(0, index);
if(source[index] == '&') {
ss << "&amp;";
}
else if(source[index] == '"') {
ss << "&quot;";
}
else if(source[index] == '<') {
ss << "&lt;";
}
else if(source[index] == '>') {
ss << "&gt;";
}
else if(source[index] == '\n' || source[index] == '\r') {
ss << " ";
}
else if(source[index] == '\0') {
ss << " ";
TraceEvent(SevWarnAlways, "StrippedIllegalCharacterFromTraceEvent").detail("Source", StringRef(source).printable()).detail("Character", StringRef(source.substr(index, 1)).printable());
}
else {
ASSERT(false);
}
source = source.substr(index+1);
}
ss << source;
}
std::string XmlTraceLogFormatter::formatEvent(const TraceEventFields &fields) {
std::stringstream ss;
ss << "<Event ";
for(auto itr : fields) {
escape(ss, itr.first);
ss << "=\"";
escape(ss, itr.second);
ss << "\" ";
}
ss << "/>\r\n";
return ss.str();
}

View File

@ -0,0 +1,43 @@
/*
* XmlTraceLogFormatter.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2018 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef FLOW_XML_TRACE_LOG_FORMATTER_H
#define FLOW_XML_TRACE_LOG_FORMATTER_H
#pragma once
#include <sstream>
#include "FastRef.h"
#include "Trace.h"
struct XmlTraceLogFormatter : public ITraceLogFormatter, ReferenceCounted<XmlTraceLogFormatter> {
void addref();
void delref();
const char* getExtension();
const char* getHeader();
const char* getFooter();
void escape(std::stringstream &ss, std::string source);
std::string formatEvent(const TraceEventFields &fields);
};
#endif

View File

@ -118,7 +118,7 @@ ERROR( read_version_already_set, 2010, "Transaction already has a read version s
ERROR( version_invalid, 2011, "Version not valid" )
ERROR( range_limits_invalid, 2012, "Range limits not valid" )
ERROR( invalid_database_name, 2013, "Database name must be 'DB'" )
ERROR( attribute_not_found, 2014, "Attribute not found in string" )
ERROR( attribute_not_found, 2014, "Attribute not found" )
ERROR( future_not_set, 2015, "Future not ready" )
ERROR( future_not_error, 2016, "Future not an error" )
ERROR( used_during_commit, 2017, "Operation issued while a commit was outstanding" )

View File

@ -93,38 +93,52 @@ Optional<uint64_t> parse_with_suffix(std::string toparse, std::string default_un
return ret;
}
std::string format( const char* form, ... ) {
int vsformat( std::string &outputString, const char* form, va_list args) {
char buf[200];
va_list args;
va_start(args, form);
int size = vsnprintf(buf, sizeof(buf), form, args);
va_end(args);
va_list args2;
va_copy(args2, args);
int size = vsnprintf(buf, sizeof(buf), form, args2);
va_end(args2);
if(size >= 0 && size < sizeof(buf)) {
return std::string(buf, size);
outputString = std::string(buf, size);
return size;
}
#ifdef _WIN32
// Microsoft's non-standard vsnprintf doesn't return a correct size, but just an error, so determine the necessary size
va_start(args, form);
size = _vscprintf(form, args);
va_end(args);
va_copy(args2, args);
size = _vscprintf(form, args2);
va_end(args2);
#endif
if (size < 0) throw internal_error();
if (size < 0) {
return -1;
}
TEST(true); //large format result
std::string s;
s.resize(size + 1);
va_start(args, form);
size = vsnprintf(&s[0], s.size(), form, args);
va_end(args);
if (size < 0 || size >= s.size()) throw internal_error();
outputString.resize(size + 1);
size = vsnprintf(&outputString[0], outputString.size(), form, args);
if (size < 0 || size >= outputString.size()) {
return -1;
}
s.resize(size);
return s;
outputString.resize(size);
return size;
}
std::string format( const char* form, ... ) {
va_list args;
va_start(args, form);
std::string str;
int result = vsformat(str, form, args);
va_end(args);
ASSERT(result >= 0);
return str;
}
Standalone<StringRef> strinc(StringRef const& str) {

View File

@ -68,6 +68,10 @@ bool validationIsEnabled();
extern Optional<uint64_t> parse_with_suffix(std::string toparse, std::string default_unit = "");
extern std::string format(const char* form, ...);
// On success, returns the number of characters written. On failure, returns a negative number.
extern int vsformat(std::string &outputString, const char* form, va_list args);
extern Standalone<StringRef> strinc(StringRef const& str);
extern StringRef strinc(StringRef const& str, Arena& arena);
extern Standalone<StringRef> addVersionStampAtEnd(StringRef const& str);

View File

@ -1,4 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="14.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|X64">
@ -18,6 +18,10 @@
<ClCompile Include="Error.cpp" />
<ClCompile Include="FastAlloc.cpp" />
<ClCompile Include="FaultInjection.cpp" />
<ClCompile Include="FileTraceLogWriter.cpp" />
<ClCompile Include="XmlTraceLogFormatter.cpp" />
<ClInclude Include="FileTraceLogWriter.h" />
<ClInclude Include="XmlTraceLogFormatter.h" />
<ClInclude Include="MetricSample.h" />
<ClInclude Include="Profiler.h" />
<ActorCompiler Include="Profiler.actor.cpp" />

View File

@ -36,6 +36,8 @@
<ClCompile Include="version.cpp" />
<ClCompile Include="stacktrace.amalgamation.cpp" />
<ClCompile Include="SignalSafeUnwind.cpp" />
<ClCompile Include="XmlTraceLogFormatter.cpp" />
<ClCompile Include="FileTraceLogWriter.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="ActorCollection.h" />
@ -72,6 +74,8 @@
<ClInclude Include="SignalSafeUnwind.h" />
<ClInclude Include="MetricSample.h" />
<ClInclude Include="stacktrace.h" />
<ClInclude Include="XmlTraceLogFormatter.h" />
<ClInclude Include="FileTraceLogWriter.h" />
</ItemGroup>
<ItemGroup>
<None Include="no_intellisense.opt" />