Started instrumentation

This commit is contained in:
Markus Pilman 2020-05-27 15:36:04 -07:00
parent d748166ed3
commit 8a7d98ab21
8 changed files with 42 additions and 21 deletions

View File

@ -35,6 +35,7 @@ typedef uint64_t Sequence;
typedef StringRef KeyRef;
typedef StringRef ValueRef;
typedef int64_t Generation;
typedef UID TransactionID;
enum {
tagLocalitySpecial = -1,

View File

@ -43,6 +43,7 @@
#include "fdbrpc/simulator.h"
#include "flow/ActorCollection.h"
#include "flow/DeterministicRandom.h"
#include "flow/IRandom.h"
#include "flow/Knobs.h"
#include "flow/Platform.h"
#include "flow/SystemMonitor.h"
@ -1531,10 +1532,12 @@ ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Databa
}
choose {
when(wait(cx->connectionFileChanged())) { throw transaction_too_old(); }
when(GetValueReply _reply =
wait(loadBalance(ssi.second, &StorageServerInterface::getValue,
GetValueRequest(key, ver, cx->sampleReadTags() ? tags : Optional<TagSet>(), getValueID), TaskPriority::DefaultPromiseEndpoint, false,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
when(GetValueReply _reply = wait(
loadBalance(ssi.second, &StorageServerInterface::getValue,
GetValueRequest(key, ver, cx->sampleReadTags() ? tags : Optional<TagSet>(),
info.id, getValueID),
TaskPriority::DefaultPromiseEndpoint, false,
cx->enableLocalityLoadBalance ? &cx->queueModel : nullptr))) {
reply = _reply;
}
}
@ -2774,6 +2777,7 @@ void Transaction::reset() {
void Transaction::fullReset() {
reset();
info.id = deterministicRandom()->randomUniqueID();
backoff = CLIENT_KNOBS->DEFAULT_BACKOFF;
}

View File

@ -19,6 +19,7 @@
*/
#pragma once
#include "flow/IRandom.h"
#if defined(NO_INTELLISENSE) && !defined(FDBCLIENT_NATIVEAPI_ACTOR_G_H)
#define FDBCLIENT_NATIVEAPI_ACTOR_G_H
#include "fdbclient/NativeAPI.actor.g.h"
@ -147,13 +148,15 @@ class ReadYourWritesTransaction; // workaround cyclic dependency
struct TransactionInfo {
Optional<UID> debugID;
TaskPriority taskID;
TransactionID id;
bool useProvisionalProxies;
// Used to save conflicting keys if FDBTransactionOptions::REPORT_CONFLICTING_KEYS is enabled
// prefix/<key1> : '1' - any keys equal or larger than this key are (probably) conflicting keys
// prefix/<key2> : '0' - any keys equal or larger than this key are (definitely) not conflicting keys
std::shared_ptr<CoalescedKeyRangeMap<Value>> conflictingKeys;
explicit TransactionInfo( TaskPriority taskID ) : taskID(taskID), useProvisionalProxies(false) {}
explicit TransactionInfo(TaskPriority taskID)
: taskID(taskID), id(deterministicRandom()->randomUniqueID()), useProvisionalProxies(false) {}
};
struct TransactionLogInfo : public ReferenceCounted<TransactionLogInfo>, NonCopyable {

View File

@ -175,15 +175,16 @@ struct GetValueRequest : TimedRequest {
Key key;
Version version;
Optional<TagSet> tags;
TransactionID txnID;
Optional<UID> debugID;
ReplyPromise<GetValueReply> reply;
GetValueRequest(){}
GetValueRequest(const Key& key, Version ver, Optional<TagSet> tags, Optional<UID> debugID) : key(key), version(ver), tags(tags), debugID(debugID) {}
GetValueRequest(const Key& key, Version ver, Optional<TagSet> tags, TransactionID txnID, Optional<UID> debugID) : key(key), version(ver), tags(tags), debugID(debugID) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, key, version, tags, debugID, reply);
serializer(ar, key, version, tags, debugID, reply, txnID);
}
};
@ -206,13 +207,16 @@ struct WatchValueRequest {
Optional<Value> value;
Version version;
Optional<TagSet> tags;
TransactionID txnID;
Optional<UID> debugID;
ReplyPromise<WatchValueReply> reply;
WatchValueRequest(){}
WatchValueRequest(const Key& key, Optional<Value> value, Version ver, Optional<TagSet> tags, Optional<UID> debugID) : key(key), value(value), version(ver), tags(tags), debugID(debugID) {}
template <class Ar>
WatchValueRequest(const Key& key, Optional<Value> value, Version ver, Optional<TagSet> tags, TransactionID txnID,
Optional<UID> debugID)
: key(key), value(value), version(ver), tags(tags), txnID(txnID), debugID(debugID) {}
template <class Ar>
void serialize( Ar& ar ) {
serializer(ar, key, value, version, tags, debugID, reply);
}

View File

@ -251,8 +251,8 @@ ACTOR Future<Void> getValueQ( StorageCacheData* data, GetValueRequest req ) {
if( req.debugID.present() ) {
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask());
//FIXME
fbTrace<GetValueDebugTrace>(req.debugID.get().first(), now(), GetValueDebugTrace::STORAGECACHE_GETVALUE_DO_READ);
}
fbTrace<GetValueDebugTrace>(req.txnID, now(), GetValueDebugTrace::STORAGECACHE_GETVALUE_DO_READ);
state Optional<Value> v;
state Version version = wait( waitForVersion( data, req.version ) );

View File

@ -21,6 +21,7 @@
#include <cinttypes>
#include "fdbrpc/fdbrpc.h"
#include "fdbrpc/LoadBalance.h"
#include "flow/FBTrace.h"
#include "flow/IndexedSet.h"
#include "flow/Hash3.h"
#include "flow/ActorCollection.h"
@ -922,11 +923,13 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
if( req.debugID.present() )
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask());
fbTrace<GetValueDebugTrace>(req.txnID, GetValueDebugTrace::STORAGESERVER_GETVALUE_DO_READ);
state Optional<Value> v;
state Version version = wait( waitForVersion( data, req.version ) );
if( req.debugID.present() )
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
fbTrace<GetValueDebugTrace>(req.txnID, GetValueDebugTrace::STORAGESERVER_GETVALUE_AFTER_VERSION);
state uint64_t changeCounter = data->shardChangeCounter;
@ -981,6 +984,7 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
if( req.debugID.present() )
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask());
fbTrace<GetValueDebugTrace>(req.txnID, req.txnID, GetValueDebugTrace::STORAGESERVER_GETVALUE_AFTER_READ);
GetValueReply reply(v);
reply.penalty = data->getPenalty();
@ -1018,7 +1022,7 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req )
try {
state Version latest = data->data().latestVersion;
state Future<Void> watchFuture = data->watches.onChange(req.key);
GetValueRequest getReq( req.key, latest, req.tags, req.debugID );
GetValueRequest getReq( req.key, latest, req.tags, req.txnID, req.debugID );
state Future<Void> getValue = getValueQ( data, getReq ); //we are relying on the delay zero at the top of getValueQ, if removed we need one here
GetValueReply reply = wait( getReq.reply.getFuture() );
//TraceEvent("WatcherCheckValue").detail("Key", req.key ).detail("Value", req.value ).detail("CurrentValue", v ).detail("Ver", latest);
@ -3708,7 +3712,7 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
self->actors.add(serveWatchValueRequests(self, ssi.watchValue.getFuture()));
self->transactionTagCounter.startNewInterval(self->thisServerID);
self->actors.add(recurring([this](){ self->transactionTagCounter.startNewInterval(self->thisServerID); }, SERVER_KNOBS->READ_TAG_MEASUREMENT_INTERVAL));
self->actors.add(recurring([&](){ self->transactionTagCounter.startNewInterval(self->thisServerID); }, SERVER_KNOBS->READ_TAG_MEASUREMENT_INTERVAL));
self->coreStarted.send( Void() );

View File

@ -20,6 +20,7 @@
#pragma once
#include "fdbclient/FDBTypes.h"
#include "flow/FileIdentifier.h"
#include "flow/FastRef.h"
#include "flow/ObjectSerializer.h"
@ -132,12 +133,12 @@ public:
READER_GETVALUEPREFIX_AFTER = 11
};
uint64_t id;
TransactionID id;
double time;
int32_t location;
GetValueDebugTrace() {}
GetValueDebugTrace(uint64_t debugID, double t, codeLocation loc) : id(debugID), time(t), location(loc) {}
GetValueDebugTrace(TransactionID id, codeLocation loc, double time = g_network->now()) : id(id), time(time), location(loc) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id, time, location);
@ -155,12 +156,12 @@ public:
NATIVEAPI_WATCHVALUE_AFTER_READ = 5
};
uint64_t id;
TransactionID id;
double time;
int32_t location;
WatchValueDebugTrace() {}
WatchValueDebugTrace(uint64_t debugID, double t, codeLocation loc) : id(debugID), time(t), location(loc) {}
WatchValueDebugTrace(TransactionID id, double t, codeLocation loc) : id(id), time(t), location(loc) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id, time, location);
@ -194,12 +195,12 @@ public:
TLOG_TLOGCOMMIT_AFTER = 20
};
uint64_t id;
TransactionID id;
double time;
int32_t location;
CommitDebugTrace() {}
CommitDebugTrace(uint64_t debugID, double t, codeLocation loc) : id(debugID), time(t), location(loc) {}
CommitDebugTrace(TransactionID id, double t, codeLocation loc) : id(id), time(t), location(loc) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id, time, location);
@ -247,12 +248,12 @@ public:
READWRITE_RANDOMREADWRITECLIENT_AFTER = 35
};
uint64_t id;
TransactionID id;
double time;
int32_t location;
TransactionDebugTrace() {}
TransactionDebugTrace(uint64_t debugID, double t, codeLocation loc) : id(debugID), time(t), location(loc) {}
TransactionDebugTrace(TransactionID id, double t, codeLocation loc) : id(id), time(t), location(loc) {}
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, id, time, location);

View File

@ -22,6 +22,7 @@
#include "flow/Trace.h"
#include "flow/FBTrace.h"
#include "flow/FileTraceLogWriter.h"
#include "flow/Knobs.h"
#include "flow/XmlTraceLogFormatter.h"
#include "flow/JsonTraceLogFormatter.h"
#include "flow/flow.h"
@ -295,6 +296,9 @@ public:
formatter->getExtension(), maxLogsSize,
[this]() { barriers->triggerAll(); }, issues));
if (FLOW_KNOBS->FBTRACES_ENABLED) {
FBTraceImpl::open(directory, processName, basename, rs, maxLogsSize);
}
if ( g_network->isSimulated() )
writer = Reference<IThreadPool>(new DummyThreadPool());
else