Merge pull request #1808 from ajbeamon/improved-transaction-metrics

Improve TransactionMetrics
This commit is contained in:
Meng Xu 2019-07-09 16:46:17 -07:00 committed by GitHub
commit cce00bb413
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 114 additions and 78 deletions

View File

@ -3244,7 +3244,7 @@ int main(int argc, char* argv[]) {
}
try {
db = Database::createDatabase(ccf, -1, localities);
db = Database::createDatabase(ccf, -1, true, localities);
}
catch (Error& e) {
fprintf(stderr, "ERROR: %s\n", e.what());
@ -3266,7 +3266,7 @@ int main(int argc, char* argv[]) {
}
try {
sourceDb = Database::createDatabase(sourceCcf, -1, localities);
sourceDb = Database::createDatabase(sourceCcf, -1, true, localities);
}
catch (Error& e) {
fprintf(stderr, "ERROR: %s\n", e.what());

View File

@ -2516,7 +2516,7 @@ ACTOR Future<int> cli(CLIOptions opt, LineNoise* plinenoise) {
TraceEvent::setNetworkThread();
try {
db = Database::createDatabase(ccf, -1);
db = Database::createDatabase(ccf, -1, false);
if (!opt.exec.present()) {
printf("Using cluster file `%s'.\n", ccf->getFilename().c_str());
}

View File

@ -58,7 +58,7 @@ public:
~DatabaseContext();
Database clone() const { return Database(new DatabaseContext( cluster, clientInfo, clientInfoMonitor, dbId, taskID, clientLocality, enableLocalityLoadBalance, lockAware, apiVersion )); }
Database clone() const { return Database(new DatabaseContext( cluster, clientInfo, clientInfoMonitor, taskID, clientLocality, enableLocalityLoadBalance, lockAware, internal, apiVersion )); }
std::pair<KeyRange,Reference<LocationInfo>> getCachedLocation( const KeyRef&, bool isBackward = false );
bool getCachedLocations( const KeyRangeRef&, vector<std::pair<KeyRange,Reference<LocationInfo>>>&, int limit, bool reverse );
@ -97,8 +97,8 @@ public:
//private:
explicit DatabaseContext( Reference<Cluster> cluster, Reference<AsyncVar<ClientDBInfo>> clientDBInfo,
Future<Void> clientInfoMonitor, Standalone<StringRef> dbId, TaskPriority taskID, LocalityData const& clientLocality,
bool enableLocalityLoadBalance, bool lockAware, int apiVersion = Database::API_VERSION_LATEST );
Future<Void> clientInfoMonitor, TaskPriority taskID, LocalityData const& clientLocality,
bool enableLocalityLoadBalance, bool lockAware, bool internal = true, int apiVersion = Database::API_VERSION_LATEST );
explicit DatabaseContext( const Error &err );
@ -133,22 +133,26 @@ public:
std::map< UID, StorageServerInfo* > server_interf;
Standalone<StringRef> dbId;
UID dbId;
bool internal; // Only contexts created through the C client and fdbcli are non-internal
CounterCollection cc;
Counter transactionReadVersions;
Counter transactionLogicalReads;
Counter transactionPhysicalReads;
Counter transactionCommittedMutations;
Counter transactionCommittedMutationBytes;
Counter transactionsCommitStarted;
Counter transactionsCommitCompleted;
Counter transactionsTooOld;
Counter transactionsFutureVersions;
Counter transactionsNotCommitted;
Counter transactionsMaybeCommitted;
Counter transactionsResourceConstrained;
Counter transactionsProcessBehind;
Counter transactionWaitsForFullRecovery;
int64_t transactionReadVersions;
int64_t transactionLogicalReads;
int64_t transactionPhysicalReads;
int64_t transactionCommittedMutations;
int64_t transactionCommittedMutationBytes;
int64_t transactionsCommitStarted;
int64_t transactionsCommitCompleted;
int64_t transactionsTooOld;
int64_t transactionsFutureVersions;
int64_t transactionsNotCommitted;
int64_t transactionsMaybeCommitted;
int64_t transactionsResourceConstrained;
int64_t transactionsProcessBehind;
int64_t transactionWaitsForFullRecovery;
ContinuousSample<double> latencies, readLatencies, commitLatencies, GRVLatencies, mutationsPerCommit, bytesPerCommit;
int outstandingWatches;

View File

@ -208,24 +208,18 @@ template <> void addref( DatabaseContext* ptr ) { ptr->addref(); }
template <> void delref( DatabaseContext* ptr ) { ptr->delref(); }
ACTOR Future<Void> databaseLogger( DatabaseContext *cx ) {
state double lastLogged = 0;
loop {
wait( delay( CLIENT_KNOBS->SYSTEM_MONITOR_INTERVAL, cx->taskID ) );
TraceEvent("TransactionMetrics")
wait(delay(CLIENT_KNOBS->SYSTEM_MONITOR_INTERVAL, cx->taskID));
TraceEvent ev("TransactionMetrics", cx->dbId);
ev.detail("Elapsed", (lastLogged == 0) ? 0 : now() - lastLogged)
.detail("Cluster", cx->cluster && cx->getConnectionFile() ? cx->getConnectionFile()->getConnectionString().clusterKeyName().toString() : "")
.detail("ReadVersions", cx->transactionReadVersions)
.detail("LogicalUncachedReads", cx->transactionLogicalReads)
.detail("PhysicalReadRequests", cx->transactionPhysicalReads)
.detail("CommittedMutations", cx->transactionCommittedMutations)
.detail("CommittedMutationBytes", cx->transactionCommittedMutationBytes)
.detail("CommitStarted", cx->transactionsCommitStarted)
.detail("CommitCompleted", cx->transactionsCommitCompleted)
.detail("TooOld", cx->transactionsTooOld)
.detail("FutureVersions", cx->transactionsFutureVersions)
.detail("NotCommitted", cx->transactionsNotCommitted)
.detail("MaybeCommitted", cx->transactionsMaybeCommitted)
.detail("ResourceConstrained", cx->transactionsResourceConstrained)
.detail("ProcessBehind", cx->transactionsProcessBehind)
.detail("MeanLatency", cx->latencies.mean())
.detail("Internal", cx->internal);
cx->cc.logToTraceEvent(ev);
ev.detail("MeanLatency", cx->latencies.mean())
.detail("MedianLatency", cx->latencies.median())
.detail("Latency90", cx->latencies.percentile(0.90))
.detail("Latency98", cx->latencies.percentile(0.98))
@ -245,12 +239,15 @@ ACTOR Future<Void> databaseLogger( DatabaseContext *cx ) {
.detail("MeanBytesPerCommit", cx->bytesPerCommit.mean())
.detail("MedianBytesPerCommit", cx->bytesPerCommit.median())
.detail("MaxBytesPerCommit", cx->bytesPerCommit.max());
cx->latencies.clear();
cx->readLatencies.clear();
cx->GRVLatencies.clear();
cx->commitLatencies.clear();
cx->mutationsPerCommit.clear();
cx->bytesPerCommit.clear();
lastLogged = now();
}
}
@ -508,18 +505,21 @@ ACTOR static Future<HealthMetrics> getHealthMetricsActor(DatabaseContext *cx, bo
Future<HealthMetrics> DatabaseContext::getHealthMetrics(bool detailed = false) {
return getHealthMetricsActor(this, detailed);
}
DatabaseContext::DatabaseContext(
Reference<Cluster> cluster, Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor, Standalone<StringRef> dbId,
TaskPriority taskID, LocalityData const& clientLocality, bool enableLocalityLoadBalance, bool lockAware, int apiVersion )
: cluster(cluster), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), dbId(dbId), taskID(taskID), clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance),
lockAware(lockAware), apiVersion(apiVersion), provisional(false),
transactionReadVersions(0), transactionLogicalReads(0), transactionPhysicalReads(0), transactionCommittedMutations(0), transactionCommittedMutationBytes(0),
transactionsCommitStarted(0), transactionsCommitCompleted(0), transactionsTooOld(0), transactionsFutureVersions(0), transactionsNotCommitted(0),
transactionsMaybeCommitted(0), transactionsResourceConstrained(0), transactionsProcessBehind(0), outstandingWatches(0), transactionTimeout(0.0), transactionMaxRetries(-1),
Reference<Cluster> cluster, Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor,
TaskPriority taskID, LocalityData const& clientLocality, bool enableLocalityLoadBalance, bool lockAware, bool internal, int apiVersion )
: cluster(cluster), clientInfo(clientInfo), clientInfoMonitor(clientInfoMonitor), taskID(taskID), clientLocality(clientLocality), enableLocalityLoadBalance(enableLocalityLoadBalance),
lockAware(lockAware), apiVersion(apiVersion), provisional(false), cc("TransactionMetrics"),
transactionReadVersions("ReadVersions", cc), transactionLogicalReads("LogicalUncachedReads", cc), transactionPhysicalReads("PhysicalReadRequests", cc),
transactionCommittedMutations("CommittedMutations", cc), transactionCommittedMutationBytes("CommittedMutationBytes", cc), transactionsCommitStarted("CommitStarted", cc),
transactionsCommitCompleted("CommitCompleted", cc), transactionsTooOld("TooOld", cc), transactionsFutureVersions("FutureVersions", cc),
transactionsNotCommitted("NotCommitted", cc), transactionsMaybeCommitted("MaybeCommitted", cc), transactionsResourceConstrained("ResourceConstrained", cc),
transactionsProcessBehind("ProcessBehind", cc), transactionWaitsForFullRecovery("WaitsForFullRecovery", cc), outstandingWatches(0), transactionTimeout(0.0), transactionMaxRetries(-1),
latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000), mvCacheInsertLocation(0),
healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0)
healthMetricsLastUpdated(0), detailedHealthMetricsLastUpdated(0), internal(internal)
{
dbId = deterministicRandom()->randomUniqueID();
metadataVersionCache.resize(CLIENT_KNOBS->METADATA_VERSION_CACHE_SIZE);
maxOutstandingWatches = CLIENT_KNOBS->DEFAULT_MAX_OUTSTANDING_WATCHES;
@ -539,7 +539,14 @@ DatabaseContext::DatabaseContext(
clientStatusUpdater.actor = clientStatusUpdateActor(this);
}
DatabaseContext::DatabaseContext( const Error &err ) : deferredError(err), latencies(1000), readLatencies(1000), commitLatencies(1000), GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000) {}
DatabaseContext::DatabaseContext( const Error &err ) : deferredError(err), cc("TransactionMetrics"),
transactionReadVersions("ReadVersions", cc), transactionLogicalReads("LogicalUncachedReads", cc), transactionPhysicalReads("PhysicalReadRequests", cc),
transactionCommittedMutations("CommittedMutations", cc), transactionCommittedMutationBytes("CommittedMutationBytes", cc), transactionsCommitStarted("CommitStarted", cc),
transactionsCommitCompleted("CommitCompleted", cc), transactionsTooOld("TooOld", cc), transactionsFutureVersions("FutureVersions", cc),
transactionsNotCommitted("NotCommitted", cc), transactionsMaybeCommitted("MaybeCommitted", cc), transactionsResourceConstrained("ResourceConstrained", cc),
transactionsProcessBehind("ProcessBehind", cc), transactionWaitsForFullRecovery("WaitsForFullRecovery", cc), latencies(1000), readLatencies(1000), commitLatencies(1000),
GRVLatencies(1000), mutationsPerCommit(1000), bytesPerCommit(1000),
internal(false) {}
ACTOR static Future<Void> monitorClientInfo( Reference<AsyncVar<Optional<ClusterInterface>>> clusterInterface, Reference<ClusterConnectionFile> ccf, Reference<AsyncVar<ClientDBInfo>> outInfo, Reference<AsyncVar<int>> connectedCoordinatorsNumDelayed ) {
try {
@ -632,11 +639,11 @@ Database DatabaseContext::create(Reference<AsyncVar<Optional<ClusterInterface>>>
Reference<AsyncVar<ClientDBInfo>> clientInfo(new AsyncVar<ClientDBInfo>());
Future<Void> clientInfoMonitor = delayedAsyncVar(connectedCoordinatorsNum, connectedCoordinatorsNumDelayed, CLIENT_KNOBS->CHECK_CONNECTED_COORDINATOR_NUM_DELAY) || monitorClientInfo(clusterInterface, connFile, clientInfo, connectedCoordinatorsNumDelayed);
return Database(new DatabaseContext(cluster, clientInfo, clientInfoMonitor, LiteralStringRef(""), TaskPriority::DefaultEndpoint, clientLocality, true, false));
return Database(new DatabaseContext(cluster, clientInfo, clientInfoMonitor, TaskPriority::DefaultEndpoint, clientLocality, true, false, true));
}
Database DatabaseContext::create(Reference<AsyncVar<ClientDBInfo>> clientInfo, Future<Void> clientInfoMonitor, LocalityData clientLocality, bool enableLocalityLoadBalance, TaskPriority taskID, bool lockAware, int apiVersion) {
return Database( new DatabaseContext( Reference<Cluster>(nullptr), clientInfo, clientInfoMonitor, LiteralStringRef(""), taskID, clientLocality, enableLocalityLoadBalance, lockAware, apiVersion ) );
return Database( new DatabaseContext( Reference<Cluster>(nullptr), clientInfo, clientInfoMonitor, taskID, clientLocality, enableLocalityLoadBalance, lockAware, true, apiVersion ) );
}
DatabaseContext::~DatabaseContext() {
@ -816,7 +823,7 @@ Reference<ClusterConnectionFile> DatabaseContext::getConnectionFile() {
return cluster->getConnectionFile();
}
Database Database::createDatabase( Reference<ClusterConnectionFile> connFile, int apiVersion, LocalityData const& clientLocality, DatabaseContext *preallocatedDb ) {
Database Database::createDatabase( Reference<ClusterConnectionFile> connFile, int apiVersion, bool internal, LocalityData const& clientLocality, DatabaseContext *preallocatedDb ) {
Reference<AsyncVar<int>> connectedCoordinatorsNum(new AsyncVar<int>(0)); // Number of connected coordinators for the client
Reference<AsyncVar<int>> connectedCoordinatorsNumDelayed(new AsyncVar<int>(0));
Reference<Cluster> cluster(new Cluster(connFile, connectedCoordinatorsNum, apiVersion));
@ -825,18 +832,18 @@ Database Database::createDatabase( Reference<ClusterConnectionFile> connFile, in
DatabaseContext *db;
if(preallocatedDb) {
db = new (preallocatedDb) DatabaseContext(cluster, clientInfo, clientInfoMonitor, LiteralStringRef(""), TaskPriority::DefaultEndpoint, clientLocality, true, false, apiVersion);
db = new (preallocatedDb) DatabaseContext(cluster, clientInfo, clientInfoMonitor, TaskPriority::DefaultEndpoint, clientLocality, true, false, internal, apiVersion);
}
else {
db = new DatabaseContext(cluster, clientInfo, clientInfoMonitor, LiteralStringRef(""), TaskPriority::DefaultEndpoint, clientLocality, true, false, apiVersion);
db = new DatabaseContext(cluster, clientInfo, clientInfoMonitor, TaskPriority::DefaultEndpoint, clientLocality, true, false, internal, apiVersion);
}
return Database(db);
}
Database Database::createDatabase( std::string connFileName, int apiVersion, LocalityData const& clientLocality ) {
Database Database::createDatabase( std::string connFileName, int apiVersion, bool internal, LocalityData const& clientLocality ) {
Reference<ClusterConnectionFile> rccf = Reference<ClusterConnectionFile>(new ClusterConnectionFile(ClusterConnectionFile::lookupClusterFileName(connFileName).first));
return Database::createDatabase(rccf, apiVersion, clientLocality);
return Database::createDatabase(rccf, apiVersion, internal, clientLocality);
}
extern IPAddress determinePublicIPAutomatically(ClusterConnectionString const& ccs);
@ -2718,7 +2725,7 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
tr->versionstampPromise.send(ret);
tr->numErrors = 0;
cx->transactionsCommitCompleted++;
++cx->transactionsCommitCompleted;
cx->transactionCommittedMutations += req.transaction.mutations.size();
cx->transactionCommittedMutationBytes += req.transaction.mutations.expectedSize();
@ -2793,7 +2800,7 @@ Future<Void> Transaction::commitMutations() {
return Void();
}
cx->transactionsCommitStarted++;
++cx->transactionsCommitStarted;
if(options.readOnly)
return transaction_read_only();
@ -3126,7 +3133,7 @@ ACTOR Future<Version> extractReadVersion(DatabaseContext* cx, Reference<Transact
}
Future<Version> Transaction::getReadVersion(uint32_t flags) {
cx->transactionReadVersions++;
++cx->transactionReadVersions;
flags |= options.getReadVersionFlags;
auto& batcher = cx->versionBatcher[ flags ];
@ -3162,15 +3169,15 @@ Future<Void> Transaction::onError( Error const& e ) {
e.code() == error_code_cluster_not_fully_recovered)
{
if(e.code() == error_code_not_committed)
cx->transactionsNotCommitted++;
++cx->transactionsNotCommitted;
if(e.code() == error_code_commit_unknown_result)
cx->transactionsMaybeCommitted++;
++cx->transactionsMaybeCommitted;
if (e.code() == error_code_proxy_memory_limit_exceeded)
cx->transactionsResourceConstrained++;
++cx->transactionsResourceConstrained;
if (e.code() == error_code_process_behind)
cx->transactionsProcessBehind++;
++cx->transactionsProcessBehind;
if (e.code() == error_code_cluster_not_fully_recovered) {
cx->transactionWaitsForFullRecovery++;
++cx->transactionWaitsForFullRecovery;
}
double backoff = getBackoff(e.code());
@ -3181,9 +3188,9 @@ Future<Void> Transaction::onError( Error const& e ) {
e.code() == error_code_future_version)
{
if( e.code() == error_code_transaction_too_old )
cx->transactionsTooOld++;
++cx->transactionsTooOld;
else if( e.code() == error_code_future_version )
cx->transactionsFutureVersions++;
++cx->transactionsFutureVersions;
double maxBackoff = options.maxBackoff;
reset();

View File

@ -74,8 +74,8 @@ class Database {
public:
enum { API_VERSION_LATEST = -1 };
static Database createDatabase( Reference<ClusterConnectionFile> connFile, int apiVersion, LocalityData const& clientLocality=LocalityData(), DatabaseContext *preallocatedDb=nullptr );
static Database createDatabase( std::string connFileName, int apiVersion, LocalityData const& clientLocality=LocalityData() );
static Database createDatabase( Reference<ClusterConnectionFile> connFile, int apiVersion, bool internal=true, LocalityData const& clientLocality=LocalityData(), DatabaseContext *preallocatedDb=nullptr );
static Database createDatabase( std::string connFileName, int apiVersion, bool internal=true, LocalityData const& clientLocality=LocalityData() );
Database() {} // an uninitialized database can be destructed or reassigned safely; that's it
void operator= ( Database const& rhs ) { db = rhs.db; }

View File

@ -68,7 +68,7 @@ ThreadSafeDatabase::ThreadSafeDatabase(std::string connFilename, int apiVersion)
onMainThreadVoid([db, connFile, apiVersion](){
try {
Database::createDatabase(connFile, apiVersion, LocalityData(), db).extractPtr();
Database::createDatabase(connFile, apiVersion, false, LocalityData(), db).extractPtr();
}
catch(Error &e) {
new (db) DatabaseContext(e);

View File

@ -24,7 +24,7 @@
#include "flow/actorcompiler.h" // This must be the last #include.
ACTOR Future<Void> restoreWorker(Reference<ClusterConnectionFile> ccf, LocalityData locality) {
state Database cx = Database::createDatabase(ccf->getFilename(), Database::API_VERSION_LATEST,locality);
state Database cx = Database::createDatabase(ccf->getFilename(), Database::API_VERSION_LATEST, true, locality);
state RestoreInterface interf;
interf.initEndpoints();
state Optional<RestoreInterface> leaderInterf;

View File

@ -505,7 +505,7 @@ ACTOR Future<Void> testerServerWorkload( WorkloadRequest work, Reference<Cluster
startRole(Role::TESTER, workIface.id(), UID(), details);
if( work.useDatabase ) {
cx = Database::createDatabase(ccf, -1, locality);
cx = Database::createDatabase(ccf, -1, true, locality);
wait( delay(1.0) );
}

View File

@ -747,7 +747,7 @@ ACTOR Future<Void> workerServer(
if(metricsPrefix.size() > 0) {
if( metricsConnFile.size() > 0) {
try {
state Database db = Database::createDatabase(metricsConnFile, Database::API_VERSION_LATEST, locality);
state Database db = Database::createDatabase(metricsConnFile, Database::API_VERSION_LATEST, true, locality);
metricsLogger = runMetrics( db, KeyRef(metricsPrefix) );
} catch(Error &e) {
TraceEvent(SevWarnAlways, "TDMetricsBadClusterFile").error(e).detail("ConnFile", metricsConnFile);

View File

@ -69,6 +69,13 @@ void Counter::clear() {
metric = 0;
}
void CounterCollection::logToTraceEvent(TraceEvent &te) const {
for (ICounter* c : counters) {
te.detail(c->getName().c_str(), c);
c->resetInterval();
}
}
ACTOR Future<Void> traceCounters(std::string traceEventName, UID traceEventID, double interval, CounterCollection* counters, std::string trackLatestName) {
wait(delay(0)); // Give an opportunity for all members used in special counters to be initialized
@ -80,15 +87,12 @@ ACTOR Future<Void> traceCounters(std::string traceEventName, UID traceEventID, d
loop{
TraceEvent te(traceEventName.c_str(), traceEventID);
te.detail("Elapsed", now() - last_interval);
for (ICounter* c : counters->counters) {
if (c->hasRate() && c->hasRoughness())
te.detailf(c->getName().c_str(), "%g %g %lld", c->getRate(), c->getRoughness(), (long long)c->getValue());
else
te.detail(c->getName().c_str(), c->getValue());
c->resetInterval();
}
if (!trackLatestName.empty())
counters->logToTraceEvent(te);
if (!trackLatestName.empty()) {
te.trackLatest(trackLatestName.c_str());
}
last_interval = now();
wait(delay(interval));

View File

@ -62,12 +62,26 @@ struct ICounter {
virtual void remove() {}
};
template<>
struct Traceable<ICounter*> : std::true_type {
static std::string toString(ICounter const *counter) {
if (counter->hasRate() && counter->hasRoughness()) {
return format("%g %g %lld", counter->getRate(), counter->getRoughness(), (long long)counter->getValue());
}
else {
return format("%lld", (long long)counter->getValue());
}
}
};
struct CounterCollection {
CounterCollection(std::string name, std::string id = std::string()) : name(name), id(id) {}
std::vector<struct ICounter*> counters, counters_to_remove;
~CounterCollection() { for (auto c : counters_to_remove) c->remove(); }
std::string name;
std::string id;
void logToTraceEvent(TraceEvent& te) const;
};
struct Counter : ICounter, NonCopyable {
@ -97,6 +111,13 @@ private:
Int64MetricHandle metric;
};
template<>
struct Traceable<Counter> : std::true_type {
static std::string toString(Counter const& counter) {
return Traceable<ICounter*>::toString((ICounter const*)&counter);
}
};
template <class F>
struct SpecialCounter : ICounter, FastAllocated<SpecialCounter<F>>, NonCopyable {
SpecialCounter(CounterCollection& collection, std::string const& name, F && f) : name(name), f(f) { collection.counters.push_back(this); collection.counters_to_remove.push_back(this); }