Add FBTrace calls for most debugTransaction trace messages.
This commit is contained in:
parent
e24017fae3
commit
be68bd6ad2
|
@ -47,6 +47,7 @@
|
|||
#include "flow/SystemMonitor.h"
|
||||
#include "flow/TLSConfig.actor.h"
|
||||
#include "flow/UnitTest.h"
|
||||
#include "flow/FBTrace.h"
|
||||
|
||||
#if defined(CMAKE_BUILD) || !defined(WIN32)
|
||||
#include "versions.h"
|
||||
|
@ -1188,8 +1189,12 @@ ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation_internal(
|
|||
ASSERT( key < allKeys.end );
|
||||
}
|
||||
|
||||
if( info.debugID.present() )
|
||||
if( info.debugID.present() ) {
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.Before");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
||||
TransactionDebugTrace::NATIVEAPI_GETKEYLOCATION_BEFORE)));
|
||||
}
|
||||
|
||||
loop {
|
||||
++cx->transactionKeyServerLocationRequests;
|
||||
|
@ -1197,8 +1202,12 @@ ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation_internal(
|
|||
when ( wait( cx->onMasterProxiesChanged() ) ) {}
|
||||
when ( GetKeyServerLocationsReply rep = wait( loadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(key, Optional<KeyRef>(), 100, isBackward, key.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) {
|
||||
++cx->transactionKeyServerLocationRequestsCompleted;
|
||||
if( info.debugID.present() )
|
||||
if( info.debugID.present() ) {
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.After");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
||||
TransactionDebugTrace::NATIVEAPI_GETKEYLOCATION_AFTER)));
|
||||
}
|
||||
ASSERT( rep.results.size() == 1 );
|
||||
|
||||
auto locationInfo = cx->setCachedLocation(rep.results[0].first, rep.results[0].second);
|
||||
|
@ -1227,8 +1236,12 @@ Future<pair<KeyRange, Reference<LocationInfo>>> getKeyLocation( Database const&
|
|||
}
|
||||
|
||||
ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLocations_internal( Database cx, KeyRange keys, int limit, bool reverse, TransactionInfo info ) {
|
||||
if( info.debugID.present() )
|
||||
if( info.debugID.present() ) {
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.Before");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
||||
TransactionDebugTrace::NATIVEAPI_GETKEYLOCATIONS_BEFORE)));
|
||||
}
|
||||
|
||||
loop {
|
||||
++cx->transactionKeyServerLocationRequests;
|
||||
|
@ -1237,8 +1250,12 @@ ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLoca
|
|||
when ( GetKeyServerLocationsReply _rep = wait( loadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(keys.begin, keys.end, limit, reverse, keys.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) {
|
||||
++cx->transactionKeyServerLocationRequestsCompleted;
|
||||
state GetKeyServerLocationsReply rep = _rep;
|
||||
if( info.debugID.present() )
|
||||
if( info.debugID.present() ) {
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.After");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
||||
TransactionDebugTrace::NATIVEAPI_GETKEYLOCATIONS_AFTER)));
|
||||
}
|
||||
ASSERT( rep.results.size() );
|
||||
|
||||
state vector< pair<KeyRange,Reference<LocationInfo>> > results;
|
||||
|
@ -1338,6 +1355,9 @@ ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Databa
|
|||
|
||||
g_traceBatch.addAttach("GetValueAttachID", info.debugID.get().first(), getValueID.get().first());
|
||||
g_traceBatch.addEvent("GetValueDebug", getValueID.get().first(), "NativeAPI.getValue.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
//FIXME
|
||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(getValueID.get().first(), now(),
|
||||
GetValueDebugTrace::NATIVEAPI_GETVALUE_BEFORE)));
|
||||
/*TraceEvent("TransactionDebugGetValueInfo", getValueID.get())
|
||||
.detail("Key", key)
|
||||
.detail("ReqVersion", ver)
|
||||
|
@ -1382,6 +1402,9 @@ ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Databa
|
|||
|
||||
if( info.debugID.present() ) {
|
||||
g_traceBatch.addEvent("GetValueDebug", getValueID.get().first(), "NativeAPI.getValue.After"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
//FIXME
|
||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(getValueID.get().first(), now(),
|
||||
GetValueDebugTrace::NATIVEAPI_GETVALUE_AFTER)));
|
||||
/*TraceEvent("TransactionDebugGetValueDone", getValueID.get())
|
||||
.detail("Key", key)
|
||||
.detail("ReqVersion", ver)
|
||||
|
@ -1396,6 +1419,9 @@ ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Databa
|
|||
cx->getValueCompleted->log();
|
||||
if( info.debugID.present() ) {
|
||||
g_traceBatch.addEvent("GetValueDebug", getValueID.get().first(), "NativeAPI.getValue.Error"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
//FIXME
|
||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(getValueID.get().first(), now(),
|
||||
GetValueDebugTrace::NATIVEAPI_GETVALUE_ERROR)));
|
||||
/*TraceEvent("TransactionDebugGetValueDone", getValueID.get())
|
||||
.detail("Key", key)
|
||||
.detail("ReqVersion", ver)
|
||||
|
@ -1417,8 +1443,12 @@ ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Databa
|
|||
ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, TransactionInfo info ) {
|
||||
wait(success(version));
|
||||
|
||||
if( info.debugID.present() )
|
||||
if( info.debugID.present() ) {
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.AfterVersion");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
||||
TransactionDebugTrace::NATIVEAPI_GETKEY_AFTERVERSION)));
|
||||
}
|
||||
|
||||
loop {
|
||||
if (k.getKey() == allKeys.end) {
|
||||
|
@ -1433,8 +1463,12 @@ ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, T
|
|||
state pair<KeyRange, Reference<LocationInfo>> ssi = wait( getKeyLocation(cx, locationKey, &StorageServerInterface::getKey, info, k.isBackward()) );
|
||||
|
||||
try {
|
||||
if( info.debugID.present() )
|
||||
if( info.debugID.present() ) {
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.Before"); //.detail("StartKey", k.getKey()).detail("Offset",k.offset).detail("OrEqual",k.orEqual);
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
||||
TransactionDebugTrace::NATIVEAPI_GETKEY_BEFORE)));
|
||||
}
|
||||
++cx->transactionPhysicalReads;
|
||||
state GetKeyReply reply;
|
||||
try {
|
||||
|
@ -1452,8 +1486,12 @@ ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, T
|
|||
++cx->transactionPhysicalReadsCompleted;
|
||||
throw;
|
||||
}
|
||||
if( info.debugID.present() )
|
||||
if( info.debugID.present() ) {
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.After"); //.detail("NextKey",reply.sel.key).detail("Offset", reply.sel.offset).detail("OrEqual", k.orEqual);
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
||||
TransactionDebugTrace::NATIVEAPI_GETKEY_AFTER)));
|
||||
}
|
||||
k = reply.sel;
|
||||
if (!k.offset && k.orEqual) {
|
||||
return k.getKey();
|
||||
|
@ -1526,6 +1564,9 @@ ACTOR Future<Void> watchValue(Future<Version> version, Key key, Optional<Value>
|
|||
|
||||
g_traceBatch.addAttach("WatchValueAttachID", info.debugID.get().first(), watchValueID.get().first());
|
||||
g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
//FIXME
|
||||
fbTrace(Reference<WatchValueDebugTrace>(new WatchValueDebugTrace(watchValueID.get().first(), now(),
|
||||
WatchValueDebugTrace::NATIVEAPI_WATCHVALUE_BEFORE)));
|
||||
}
|
||||
state WatchValueReply resp;
|
||||
choose {
|
||||
|
@ -1538,6 +1579,9 @@ ACTOR Future<Void> watchValue(Future<Version> version, Key key, Optional<Value>
|
|||
}
|
||||
if( info.debugID.present() ) {
|
||||
g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.After"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
//FIXME
|
||||
fbTrace(Reference<WatchValueDebugTrace>(new WatchValueDebugTrace(watchValueID.get().first(), now(),
|
||||
WatchValueDebugTrace::NATIVEAPI_WATCHVALUE_AFTER)));
|
||||
}
|
||||
|
||||
//FIXME: wait for known committed version on the storage server before replying,
|
||||
|
@ -1618,6 +1662,9 @@ ACTOR Future<Standalone<RangeResultRef>> getExactRange( Database cx, Version ver
|
|||
try {
|
||||
if( info.debugID.present() ) {
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.Before");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
||||
TransactionDebugTrace::NATIVEAPI_GETEXACTRANGE_BEFORE)));
|
||||
/*TraceEvent("TransactionDebugGetExactRangeInfo", info.debugID.get())
|
||||
.detail("ReqBeginKey", req.begin.getKey())
|
||||
.detail("ReqEndKey", req.end.getKey())
|
||||
|
@ -1644,8 +1691,12 @@ ACTOR Future<Standalone<RangeResultRef>> getExactRange( Database cx, Version ver
|
|||
++cx->transactionPhysicalReadsCompleted;
|
||||
throw;
|
||||
}
|
||||
if( info.debugID.present() )
|
||||
if( info.debugID.present() ) {
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.After");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
||||
TransactionDebugTrace::NATIVEAPI_GETEXACTRANGE_AFTER)));
|
||||
}
|
||||
output.arena().dependsOn( rep.arena );
|
||||
output.append( output.arena(), rep.data.begin(), rep.data.size() );
|
||||
|
||||
|
@ -1904,6 +1955,9 @@ ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<Transa
|
|||
try {
|
||||
if( info.debugID.present() ) {
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Before");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
||||
TransactionDebugTrace::NATIVEAPI_GETRANGE_BEFORE)));
|
||||
/*TraceEvent("TransactionDebugGetRangeInfo", info.debugID.get())
|
||||
.detail("ReqBeginKey", req.begin.getKey())
|
||||
.detail("ReqEndKey", req.end.getKey())
|
||||
|
@ -1939,6 +1993,9 @@ ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<Transa
|
|||
|
||||
if( info.debugID.present() ) {
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.After");//.detail("SizeOf", rep.data.size());
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
||||
TransactionDebugTrace::NATIVEAPI_GETEXACTRANGE_AFTER)));
|
||||
/*TraceEvent("TransactionDebugGetRangeDone", info.debugID.get())
|
||||
.detail("ReqBeginKey", req.begin.getKey())
|
||||
.detail("ReqEndKey", req.end.getKey())
|
||||
|
@ -2026,6 +2083,9 @@ ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<Transa
|
|||
} catch ( Error& e ) {
|
||||
if( info.debugID.present() ) {
|
||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Error");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
||||
TransactionDebugTrace::NATIVEAPI_GETRANGE_ERROR)));
|
||||
TraceEvent("TransactionDebugError", info.debugID.get()).error(e);
|
||||
}
|
||||
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||
|
||||
|
@ -2712,6 +2772,9 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
|
|||
commitID = nondeterministicRandom()->randomUniqueID();
|
||||
g_traceBatch.addAttach("CommitAttachID", info.debugID.get().first(), commitID.get().first());
|
||||
g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.Before");
|
||||
//FIXME
|
||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(commitID.get().first(), now(),
|
||||
CommitDebugTrace::NATIVEAPI_COMMIT_BEFORE)));
|
||||
}
|
||||
|
||||
req.debugID = commitID;
|
||||
|
@ -2755,9 +2818,12 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
|
|||
cx->transactionCommittedMutations += req.transaction.mutations.size();
|
||||
cx->transactionCommittedMutationBytes += req.transaction.mutations.expectedSize();
|
||||
|
||||
if(info.debugID.present())
|
||||
if(info.debugID.present()) {
|
||||
g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.After");
|
||||
|
||||
//FIXME
|
||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(commitID.get().first(), now(),
|
||||
CommitDebugTrace::NATIVEAPI_COMMIT_AFTER)));
|
||||
}
|
||||
double latency = now() - startTime;
|
||||
cx->commitLatencies.addSample(latency);
|
||||
cx->latencies.addSample(now() - tr->startTime);
|
||||
|
@ -2807,9 +2873,12 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
|
|||
if (info.debugID.present())
|
||||
TraceEvent(interval.end()).detail("Conflict", 1);
|
||||
|
||||
if(info.debugID.present())
|
||||
if(info.debugID.present()) {
|
||||
g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.After");
|
||||
|
||||
//FIXME
|
||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(commitID.get().first(), now(),
|
||||
CommitDebugTrace::NATIVEAPI_COMMIT_AFTER)));
|
||||
}
|
||||
throw not_committed();
|
||||
}
|
||||
}
|
||||
|
@ -3139,15 +3208,23 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
|
|||
ACTOR Future<GetReadVersionReply> getConsistentReadVersion( DatabaseContext *cx, uint32_t transactionCount, uint32_t flags, Optional<UID> debugID ) {
|
||||
try {
|
||||
++cx->transactionReadVersionBatches;
|
||||
if( debugID.present() )
|
||||
if( debugID.present() ) {
|
||||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.Before");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(debugID.get().first(), now(),
|
||||
TransactionDebugTrace::NATIVEAPI_GETCONSISTENTREADVERSION_BEFORE)));
|
||||
}
|
||||
loop {
|
||||
state GetReadVersionRequest req( transactionCount, flags, debugID );
|
||||
choose {
|
||||
when ( wait( cx->onMasterProxiesChanged() ) ) {}
|
||||
when ( GetReadVersionReply v = wait( loadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) {
|
||||
if( debugID.present() )
|
||||
if( debugID.present() ) {
|
||||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.After");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(debugID.get().first(), now(),
|
||||
TransactionDebugTrace::NATIVEAPI_GETCONSISTENTREADVERSION_AFTER)));
|
||||
}
|
||||
ASSERT( v.version > 0 );
|
||||
cx->minAcceptableReadVersion = std::min(cx->minAcceptableReadVersion, v.version);
|
||||
return v;
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "fdbserver/CoroFlow.h"
|
||||
#include "fdbserver/Knobs.h"
|
||||
#include "flow/Hash3.h"
|
||||
#include "flow/FBTrace.h"
|
||||
|
||||
extern "C" {
|
||||
#include "fdbserver/sqlite/sqliteInt.h"
|
||||
|
@ -1530,12 +1531,22 @@ private:
|
|||
};
|
||||
void action( ReadValueAction& rv ) {
|
||||
//double t = timer();
|
||||
if (rv.debugID.present()) g_traceBatch.addEvent("GetValueDebug", rv.debugID.get().first(), "Reader.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
if (rv.debugID.present()) {
|
||||
g_traceBatch.addEvent("GetValueDebug", rv.debugID.get().first(), "Reader.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
//FIXME
|
||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(rv.debugID.get().first(), now(),
|
||||
GetValueDebugTrace::READER_GETVALUE_BEFORE)));
|
||||
}
|
||||
|
||||
rv.result.send( getCursor()->get().get(rv.key) );
|
||||
++counter;
|
||||
|
||||
if (rv.debugID.present()) g_traceBatch.addEvent("GetValueDebug", rv.debugID.get().first(), "Reader.After"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
if (rv.debugID.present()) {
|
||||
g_traceBatch.addEvent("GetValueDebug", rv.debugID.get().first(), "Reader.After"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
//FIXME
|
||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(rv.debugID.get().first(), now(),
|
||||
GetValueDebugTrace::READER_GETVALUE_AFTER)));
|
||||
}
|
||||
//t = timer()-t;
|
||||
//if (t >= 1.0) TraceEvent("ReadValueActionSlow",dbgid).detail("Elapsed", t);
|
||||
}
|
||||
|
@ -1550,12 +1561,22 @@ private:
|
|||
};
|
||||
void action( ReadValuePrefixAction& rv ) {
|
||||
//double t = timer();
|
||||
if (rv.debugID.present()) g_traceBatch.addEvent("GetValuePrefixDebug", rv.debugID.get().first(), "Reader.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
if (rv.debugID.present()) {
|
||||
g_traceBatch.addEvent("GetValuePrefixDebug", rv.debugID.get().first(), "Reader.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
//FIXME
|
||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(rv.debugID.get().first(), now(),
|
||||
GetValueDebugTrace::READER_GETVALUEPREFIX_BEFORE)));
|
||||
}
|
||||
|
||||
rv.result.send( getCursor()->get().getPrefix(rv.key, rv.maxLength) );
|
||||
++counter;
|
||||
|
||||
if (rv.debugID.present()) g_traceBatch.addEvent("GetValuePrefixDebug", rv.debugID.get().first(), "Reader.After"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
if (rv.debugID.present()) {
|
||||
g_traceBatch.addEvent("GetValuePrefixDebug", rv.debugID.get().first(), "Reader.After"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
//FIXME
|
||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(rv.debugID.get().first(), now(),
|
||||
GetValueDebugTrace::READER_GETVALUEPREFIX_AFTER)));
|
||||
}
|
||||
//t = timer()-t;
|
||||
//if (t >= 1.0) TraceEvent("ReadValuePrefixActionSlow",dbgid).detail("Elapsed", t);
|
||||
}
|
||||
|
|
|
@ -45,6 +45,7 @@
|
|||
#include "flow/ActorCollection.h"
|
||||
#include "flow/Knobs.h"
|
||||
#include "flow/Stats.h"
|
||||
#include "flow/FBTrace.h"
|
||||
#include "flow/TDMetric.actor.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
|
@ -191,8 +192,12 @@ ACTOR Future<Void> queueTransactionStartRequests(
|
|||
req.reply.send(rep);
|
||||
TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceeded").suppressFor(60);
|
||||
} else {
|
||||
if (req.debugID.present())
|
||||
if (req.debugID.present()) {
|
||||
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.queueTransactionStartRequests.Before");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(req.debugID.get().first(), now(),
|
||||
TransactionDebugTrace::MASTERPROXYSERVER_QUEUETRANSACTIONSTARTREQUESTS_BEFORE)));
|
||||
}
|
||||
|
||||
if (systemQueue->empty() && defaultQueue->empty() && batchQueue->empty()) {
|
||||
forwardPromise(GRVTimer, delayJittered(std::max(0.0, *GRVBatchTime - (now() - *lastGRVTime)), TaskPriority::ProxyGRVTimer));
|
||||
|
@ -496,6 +501,9 @@ ACTOR Future<Void> commitBatcher(ProxyCommitData *commitData, PromiseStream<std:
|
|||
|
||||
if(req.debugID.present()) {
|
||||
g_traceBatch.addEvent("CommitDebug", req.debugID.get().first(), "MasterProxyServer.batcher");
|
||||
//FIXME
|
||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(req.debugID.get().first(), now(),
|
||||
CommitDebugTrace::MASTERPROXYSERVER_BATCHER)));
|
||||
}
|
||||
|
||||
if(!batch.size()) {
|
||||
|
@ -684,8 +692,12 @@ ACTOR Future<Void> commitBatch(
|
|||
TraceEvent("SecondCommitBatch", self->dbgid).detail("DebugID", debugID.get());
|
||||
}
|
||||
|
||||
if (debugID.present())
|
||||
if (debugID.present()) {
|
||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.Before");
|
||||
//FIXME
|
||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(debugID.get().first(), now(),
|
||||
CommitDebugTrace::MASTERPROXYSERVER_COMMITBATCH_BEFORE)));
|
||||
}
|
||||
|
||||
/////// Phase 1: Pre-resolution processing (CPU bound except waiting for a version # which is separately pipelined and *should* be available by now (unless empty commit); ordered; currently atomic but could yield)
|
||||
|
||||
|
@ -694,8 +706,12 @@ ACTOR Future<Void> commitBatch(
|
|||
wait(self->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber-1));
|
||||
state Future<Void> releaseDelay = delay(std::min(SERVER_KNOBS->MAX_PROXY_COMPUTE, batchOperations*self->commitComputePerOperation[latencyBucket]), TaskPriority::ProxyMasterVersionReply);
|
||||
|
||||
if (debugID.present())
|
||||
if (debugID.present()) {
|
||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GettingCommitVersion");
|
||||
//FIXME
|
||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(debugID.get().first(), now(),
|
||||
CommitDebugTrace::MASTERPROXYSERVER_COMMITBATCH_GETTINGCOMMITVERSION)));
|
||||
}
|
||||
|
||||
GetCommitVersionRequest req(self->commitVersionRequestNumber++, self->mostRecentProcessedRequestNumber, self->dbgid);
|
||||
GetCommitVersionReply versionReply = wait( brokenPromiseToNever(self->master.getCommitVersion.getReply(req, TaskPriority::ProxyMasterVersionReply)) );
|
||||
|
@ -715,8 +731,12 @@ ACTOR Future<Void> commitBatch(
|
|||
|
||||
//TraceEvent("ProxyGotVer", self->dbgid).detail("Commit", commitVersion).detail("Prev", prevVersion);
|
||||
|
||||
if (debugID.present())
|
||||
if (debugID.present()) {
|
||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GotCommitVersion");
|
||||
//FIXME
|
||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(debugID.get().first(), now(),
|
||||
CommitDebugTrace::MASTERPROXYSERVER_COMMITBATCH_GOTCOMMITVERSION)));
|
||||
}
|
||||
|
||||
ResolutionRequestBuilder requests( self, commitVersion, prevVersion, self->version );
|
||||
int conflictRangeCount = 0;
|
||||
|
@ -748,8 +768,12 @@ ACTOR Future<Void> commitBatch(
|
|||
/////// Phase 2: Resolution (waiting on the network; pipelined)
|
||||
state vector<ResolveTransactionBatchReply> resolution = wait( getAll(replies) );
|
||||
|
||||
if (debugID.present())
|
||||
if (debugID.present()) {
|
||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.AfterResolution");
|
||||
//FIXME
|
||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(debugID.get().first(), now(),
|
||||
CommitDebugTrace::MASTERPROXYSERVER_COMMITBATCH_AFTERRESOLUTION)));
|
||||
}
|
||||
|
||||
////// Phase 3: Post-resolution processing (CPU bound except for very rare situations; ordered; currently atomic but doesn't need to be)
|
||||
TEST(self->latestLocalCommitBatchLogging.get() < localBatchNumber - 1); // Queuing post-resolution commit processing
|
||||
|
@ -760,8 +784,12 @@ ACTOR Future<Void> commitBatch(
|
|||
state double computeDuration = 0;
|
||||
self->stats.txnCommitResolved += trs.size();
|
||||
|
||||
if (debugID.present())
|
||||
if (debugID.present()) {
|
||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.ProcessingMutations");
|
||||
//FIXME
|
||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(debugID.get().first(), now(),
|
||||
CommitDebugTrace::MASTERPROXYSERVER_COMMITBATCH_PROCESSINGMUTATIONS)));
|
||||
}
|
||||
|
||||
state Arena arena;
|
||||
state bool isMyFirstBatch = !self->version;
|
||||
|
@ -1058,8 +1086,12 @@ ACTOR Future<Void> commitBatch(
|
|||
|
||||
state LogSystemDiskQueueAdapter::CommitMessage msg = storeCommits.back().first.get();
|
||||
|
||||
if (debugID.present())
|
||||
if (debugID.present()) {
|
||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.AfterStoreCommits");
|
||||
//FIXME
|
||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(debugID.get().first(), now(),
|
||||
CommitDebugTrace::MASTERPROXYSERVER_COMMITBATCH_AFTERSTORECOMMITS)));
|
||||
}
|
||||
|
||||
// txnState (transaction subsystem state) tag: message extracted from log adapter
|
||||
bool firstMessage = true;
|
||||
|
@ -1133,8 +1165,12 @@ ACTOR Future<Void> commitBatch(
|
|||
debug_advanceMinCommittedVersion(UID(), commitVersion);
|
||||
|
||||
//TraceEvent("ProxyPushed", self->dbgid).detail("PrevVersion", prevVersion).detail("Version", commitVersion);
|
||||
if (debugID.present())
|
||||
if (debugID.present()) {
|
||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.AfterLogPush");
|
||||
//FIXME
|
||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(debugID.get().first(), now(),
|
||||
CommitDebugTrace::MASTERPROXYSERVER_COMMITBATCH_AFTERLOGPUSH)));
|
||||
}
|
||||
|
||||
for (auto &p : storeCommits) {
|
||||
ASSERT(!p.second.isReady());
|
||||
|
@ -1261,6 +1297,9 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commi
|
|||
|
||||
if (debugID.present()) {
|
||||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.confirmEpochLive");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(debugID.get().first(), now(),
|
||||
TransactionDebugTrace::MASTERPROXYSERVER_GETLIVECOMMITTEDVERSION_CONFIRMEPOCHLIVE)));
|
||||
}
|
||||
|
||||
vector<GetReadVersionReply> versions = wait(getAll(proxyVersions));
|
||||
|
@ -1277,6 +1316,9 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commi
|
|||
|
||||
if (debugID.present()) {
|
||||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.After");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(debugID.get().first(), now(),
|
||||
TransactionDebugTrace::MASTERPROXYSERVER_GETLIVECOMMITTEDVERSION_AFTER)));
|
||||
}
|
||||
|
||||
commitData->stats.txnStartOut += transactionCount;
|
||||
|
@ -1435,6 +1477,9 @@ ACTOR static Future<Void> transactionStarter(
|
|||
|
||||
if (debugID.present()) {
|
||||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.masterProxyServerCore.Broadcast");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(debugID.get().first(), now(),
|
||||
TransactionDebugTrace::MASTERPROXYSERVER_MASTERPROXYSERVERCORE_BROADCAST)));
|
||||
}
|
||||
|
||||
for (int i = 0; i < start.size(); i++) {
|
||||
|
@ -1881,8 +1926,12 @@ ACTOR Future<Void> masterProxyServerCore(
|
|||
}
|
||||
when(GetRawCommittedVersionRequest req = waitNext(proxy.getRawCommittedVersion.getFuture())) {
|
||||
//TraceEvent("ProxyGetRCV", proxy.id());
|
||||
if (req.debugID.present())
|
||||
if (req.debugID.present()) {
|
||||
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.masterProxyServerCore.GetRawCommittedVersion");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(req.debugID.get().first(), now(),
|
||||
TransactionDebugTrace::MASTERPROXYSERVER_MASTERPROXYSERVERCORE_GETRAWCOMMITTEDVERSION)));
|
||||
}
|
||||
GetReadVersionReply rep;
|
||||
rep.locked = commitData.locked;
|
||||
rep.metadataVersion = commitData.metadataVersion;
|
||||
|
|
|
@ -30,6 +30,7 @@
|
|||
#include "fdbserver/ConflictSet.h"
|
||||
#include "fdbserver/StorageMetrics.h"
|
||||
#include "fdbclient/SystemData.h"
|
||||
#include "flow/FBTrace.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
namespace {
|
||||
|
@ -115,6 +116,9 @@ ACTOR Future<Void> resolveBatch(
|
|||
debugID = nondeterministicRandom()->randomUniqueID();
|
||||
g_traceBatch.addAttach("CommitAttachID", req.debugID.get().first(), debugID.get().first());
|
||||
g_traceBatch.addEvent("CommitDebug",debugID.get().first(),"Resolver.resolveBatch.Before");
|
||||
//FIXME
|
||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(req.debugID.get().first(), now(),
|
||||
CommitDebugTrace::RESOLVER_RESOLVEBATCH_BEFORE)));
|
||||
}
|
||||
|
||||
/*TraceEvent("ResolveBatchStart", self->dbgid).detail("From", proxyAddress).detail("Version", req.version).detail("PrevVersion", req.prevVersion).detail("StateTransactions", req.txnStateTransactions.size())
|
||||
|
@ -132,6 +136,9 @@ ACTOR Future<Void> resolveBatch(
|
|||
|
||||
if(debugID.present()) {
|
||||
g_traceBatch.addEvent("CommitDebug",debugID.get().first(),"Resolver.resolveBatch.AfterQueueSizeCheck");
|
||||
//FIXME
|
||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(req.debugID.get().first(), now(),
|
||||
CommitDebugTrace::RESOLVER_RESOLVEBATCH_AFTERQUEUESIZECHECK)));
|
||||
}
|
||||
|
||||
loop {
|
||||
|
@ -164,8 +171,12 @@ ACTOR Future<Void> resolveBatch(
|
|||
Version firstUnseenVersion = proxyInfo.lastVersion + 1;
|
||||
proxyInfo.lastVersion = req.version;
|
||||
|
||||
if(req.debugID.present())
|
||||
if(req.debugID.present()) {
|
||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "Resolver.resolveBatch.AfterOrderer");
|
||||
//FIXME
|
||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(req.debugID.get().first(), now(),
|
||||
CommitDebugTrace::RESOLVER_RESOLVEBATCH_AFTERORDERER)));
|
||||
}
|
||||
|
||||
ResolveTransactionBatchReply& reply = proxyInfo.outstandingBatches[req.version];
|
||||
|
||||
|
@ -276,8 +287,12 @@ ACTOR Future<Void> resolveBatch(
|
|||
self->checkNeededVersion.trigger();
|
||||
}
|
||||
|
||||
if(req.debugID.present())
|
||||
if(req.debugID.present()) {
|
||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "Resolver.resolveBatch.After");
|
||||
//FIXME
|
||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(req.debugID.get().first(), now(),
|
||||
CommitDebugTrace::RESOLVER_RESOLVEBATCH_AFTER)));
|
||||
}
|
||||
}
|
||||
else {
|
||||
TEST(true); // Duplicate resolve batch request
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -21,6 +21,7 @@
|
|||
#include "flow/Hash3.h"
|
||||
#include "flow/Stats.h"
|
||||
#include "flow/UnitTest.h"
|
||||
#include "flow/FBTrace.h"
|
||||
#include "fdbclient/NativeAPI.actor.h"
|
||||
#include "fdbclient/Notified.h"
|
||||
#include "fdbclient/KeyRangeMap.h"
|
||||
|
@ -1766,6 +1767,9 @@ ACTOR Future<Void> tLogCommit(
|
|||
tlogDebugID = nondeterministicRandom()->randomUniqueID();
|
||||
g_traceBatch.addAttach("CommitAttachID", req.debugID.get().first(), tlogDebugID.get().first());
|
||||
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.BeforeWaitForVersion");
|
||||
//FIXME
|
||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(req.debugID.get().first(), now(),
|
||||
CommitDebugTrace::TLOG_TLOGCOMMIT_BEFOREWAITFORVERSION)));
|
||||
}
|
||||
|
||||
logData->minKnownCommittedVersion = std::max(logData->minKnownCommittedVersion, req.minKnownCommittedVersion);
|
||||
|
@ -1795,8 +1799,12 @@ ACTOR Future<Void> tLogCommit(
|
|||
}
|
||||
|
||||
if (logData->version.get() == req.prevVersion) { // Not a duplicate (check relies on critical section between here self->version.set() below!)
|
||||
if(req.debugID.present())
|
||||
if(req.debugID.present()) {
|
||||
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.Before");
|
||||
//FIXME
|
||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(req.debugID.get().first(), now(),
|
||||
CommitDebugTrace::TLOG_TLOGCOMMIT_BEFORE)));
|
||||
}
|
||||
|
||||
//TraceEvent("TLogCommit", logData->logId).detail("Version", req.version);
|
||||
commitMessages(self, logData, req.version, req.arena, req.messages);
|
||||
|
@ -1819,8 +1827,12 @@ ACTOR Future<Void> tLogCommit(
|
|||
// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors
|
||||
logData->version.set( req.version );
|
||||
|
||||
if(req.debugID.present())
|
||||
if(req.debugID.present()) {
|
||||
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.AfterTLogCommit");
|
||||
//FIXME
|
||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(req.debugID.get().first(), now(),
|
||||
CommitDebugTrace::TLOG_TLOGCOMMIT_AFTERTLOGCOMMIT)));
|
||||
}
|
||||
}
|
||||
// Send replies only once all prior messages have been received and committed.
|
||||
state Future<Void> stopped = logData->stopCommit.onTrigger();
|
||||
|
@ -1832,8 +1844,12 @@ ACTOR Future<Void> tLogCommit(
|
|||
return Void();
|
||||
}
|
||||
|
||||
if(req.debugID.present())
|
||||
if(req.debugID.present()) {
|
||||
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.After");
|
||||
//FIXME
|
||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(req.debugID.get().first(), now(),
|
||||
CommitDebugTrace::TLOG_TLOGCOMMIT_AFTER)));
|
||||
}
|
||||
|
||||
req.reply.send( logData->durableKnownCommittedVersion );
|
||||
return Void();
|
||||
|
@ -2099,6 +2115,9 @@ ACTOR Future<Void> serveTLogInterface( TLogData* self, TLogInterface tli, Refere
|
|||
UID tlogDebugID = nondeterministicRandom()->randomUniqueID();
|
||||
g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), tlogDebugID.first());
|
||||
g_traceBatch.addEvent("TransactionDebug", tlogDebugID.first(), "TLogServer.TLogConfirmRunningRequest");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(req.debugID.get().first(), now(),
|
||||
TransactionDebugTrace::TLOGSERVER_TLOGCONFIRMRUNNINGREQUEST)));
|
||||
}
|
||||
if (!logData->stopped)
|
||||
req.reply.send(Void());
|
||||
|
|
|
@ -51,6 +51,7 @@
|
|||
#include "fdbrpc/sim_validation.h"
|
||||
#include "fdbrpc/Smoother.h"
|
||||
#include "flow/Stats.h"
|
||||
#include "flow/FBTrace.h"
|
||||
#include "flow/TDMetric.actor.h"
|
||||
#include <type_traits>
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
@ -838,13 +839,21 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
|
|||
// so we need to downgrade here
|
||||
wait( delay(0, TaskPriority::DefaultEndpoint) );
|
||||
|
||||
if( req.debugID.present() )
|
||||
if( req.debugID.present() ) {
|
||||
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
//FIXME
|
||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(req.debugID.get().first(), now(),
|
||||
GetValueDebugTrace::STORAGESERVER_GETVALUE_DOREAD)));
|
||||
}
|
||||
|
||||
state Optional<Value> v;
|
||||
state Version version = wait( waitForVersion( data, req.version ) );
|
||||
if( req.debugID.present() )
|
||||
if( req.debugID.present() ) {
|
||||
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
//FIXME
|
||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(req.debugID.get().first(), now(),
|
||||
GetValueDebugTrace::STORAGESERVER_GETVALUE_AFTERVERSION)));
|
||||
}
|
||||
|
||||
state uint64_t changeCounter = data->shardChangeCounter;
|
||||
|
||||
|
@ -897,8 +906,12 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
|
|||
data->metrics.notifyBytesReadPerKSecond(req.key, bytesReadPerKSecond);
|
||||
}
|
||||
|
||||
if( req.debugID.present() )
|
||||
if( req.debugID.present() ) {
|
||||
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
//FIXME
|
||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(req.debugID.get().first(), now(),
|
||||
GetValueDebugTrace::STORAGESERVER_GETVALUE_AFTERREAD)));
|
||||
}
|
||||
|
||||
GetValueReply reply(v);
|
||||
reply.penalty = data->getPenalty();
|
||||
|
@ -923,12 +936,20 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req )
|
|||
try {
|
||||
++data->counters.watchQueries;
|
||||
|
||||
if( req.debugID.present() )
|
||||
if( req.debugID.present() ) {
|
||||
g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
//FIXME
|
||||
fbTrace(Reference<WatchValueDebugTrace>(new WatchValueDebugTrace(req.debugID.get().first(), now(),
|
||||
WatchValueDebugTrace::STORAGESERVER_WATCHVALUE_BEFORE)));
|
||||
}
|
||||
|
||||
wait(success(waitForVersionNoTooOld(data, req.version)));
|
||||
if( req.debugID.present() )
|
||||
if( req.debugID.present() ) {
|
||||
g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
//FIXME
|
||||
fbTrace(Reference<WatchValueDebugTrace>(new WatchValueDebugTrace(req.debugID.get().first(), now(),
|
||||
WatchValueDebugTrace::STORAGESERVER_WATCHVALUE_AFTERVERSION)));
|
||||
}
|
||||
|
||||
loop {
|
||||
try {
|
||||
|
@ -945,8 +966,12 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req )
|
|||
|
||||
debugMutation("ShardWatchValue", latest, MutationRef(MutationRef::DebugKey, req.key, reply.value.present() ? StringRef( reply.value.get() ) : LiteralStringRef("<null>") ) );
|
||||
|
||||
if( req.debugID.present() )
|
||||
if( req.debugID.present() ) {
|
||||
g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
//FIXME
|
||||
fbTrace(Reference<WatchValueDebugTrace>(new WatchValueDebugTrace(req.debugID.get().first(), now(),
|
||||
WatchValueDebugTrace::STORAGESERVER_WATCHVALUE_AFTERREAD)));
|
||||
}
|
||||
|
||||
if( reply.value != req.value ) {
|
||||
req.reply.send(WatchValueReply{ latest });
|
||||
|
@ -1351,16 +1376,24 @@ ACTOR Future<Void> getKeyValues( StorageServer* data, GetKeyValuesRequest req )
|
|||
wait( delay(0, taskType) );
|
||||
|
||||
try {
|
||||
if( req.debugID.present() )
|
||||
if( req.debugID.present() ) {
|
||||
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.Before");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(req.debugID.get().first(), now(),
|
||||
TransactionDebugTrace::STORAGESERVER_GETKEYVALUES_BEFORE)));
|
||||
}
|
||||
state Version version = wait( waitForVersion( data, req.version ) );
|
||||
|
||||
state uint64_t changeCounter = data->shardChangeCounter;
|
||||
// try {
|
||||
state KeyRange shard = getShardKeyRange( data, req.begin );
|
||||
|
||||
if( req.debugID.present() )
|
||||
if( req.debugID.present() ) {
|
||||
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.AfterVersion");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(req.debugID.get().first(), now(),
|
||||
TransactionDebugTrace::STORAGESERVER_GETKEYVALUES_AFTERVERSION)));
|
||||
}
|
||||
//.detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end);
|
||||
//} catch (Error& e) { TraceEvent("WrongShardServer", data->thisServerID).detail("Begin", req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard", "None").detail("In", "getKeyValues>getShardKeyRange"); throw e; }
|
||||
|
||||
|
@ -1375,8 +1408,12 @@ ACTOR Future<Void> getKeyValues( StorageServer* data, GetKeyValuesRequest req )
|
|||
state Future<Key> fEnd = req.end.isFirstGreaterOrEqual() ? Future<Key>(req.end.getKey()) : findKey( data, req.end, version, shard, &offset2 );
|
||||
state Key begin = wait(fBegin);
|
||||
state Key end = wait(fEnd);
|
||||
if( req.debugID.present() )
|
||||
if( req.debugID.present() ) {
|
||||
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.AfterKeys");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(req.debugID.get().first(), now(),
|
||||
TransactionDebugTrace::STORAGESERVER_GETKEYVALUES_AFTERKEYS)));
|
||||
}
|
||||
//.detail("Off1",offset1).detail("Off2",offset2).detail("ReqBegin",req.begin.getKey()).detail("ReqEnd",req.end.getKey());
|
||||
|
||||
// Offsets of zero indicate begin/end keys in this shard, which obviously means we can answer the query
|
||||
|
@ -1392,8 +1429,12 @@ ACTOR Future<Void> getKeyValues( StorageServer* data, GetKeyValuesRequest req )
|
|||
}
|
||||
|
||||
if (begin >= end) {
|
||||
if( req.debugID.present() )
|
||||
if( req.debugID.present() ) {
|
||||
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.Send");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(req.debugID.get().first(), now(),
|
||||
TransactionDebugTrace::STORAGESERVER_GETKEYVALUES_SEND)));
|
||||
}
|
||||
//.detail("Begin",begin).detail("End",end);
|
||||
|
||||
GetKeyValuesReply none;
|
||||
|
@ -1409,8 +1450,12 @@ ACTOR Future<Void> getKeyValues( StorageServer* data, GetKeyValuesRequest req )
|
|||
GetKeyValuesReply _r = wait( readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes) );
|
||||
GetKeyValuesReply r = _r;
|
||||
|
||||
if( req.debugID.present() )
|
||||
if( req.debugID.present() ) {
|
||||
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.AfterReadRange");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(req.debugID.get().first(), now(),
|
||||
TransactionDebugTrace::STORAGESERVER_GETKEYVALUES_AFTERREADRANGE)));
|
||||
}
|
||||
//.detail("Begin",begin).detail("End",end).detail("SizeOf",r.data.size());
|
||||
data->checkChangeCounter( changeCounter, KeyRangeRef( std::min<KeyRef>(begin, std::min<KeyRef>(req.begin.getKey(), req.end.getKey())), std::max<KeyRef>(end, std::max<KeyRef>(req.begin.getKey(), req.end.getKey())) ) );
|
||||
if (EXPENSIVE_VALIDATION) {
|
||||
|
@ -3602,8 +3647,12 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
|
|||
}
|
||||
when( GetValueRequest req = waitNext(ssi.getValue.getFuture()) ) {
|
||||
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade before doing real work
|
||||
if( req.debugID.present() )
|
||||
if( req.debugID.present() ) {
|
||||
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "storageServer.received"); //.detail("TaskID", g_network->getCurrentTask());
|
||||
//FIXME
|
||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(req.debugID.get().first(), now(),
|
||||
GetValueDebugTrace::STORAGESERVER_GETVALUE_RECEIVED)));
|
||||
}
|
||||
|
||||
if (SHORT_CIRCUT_ACTUAL_STORAGE && normalKeys.contains(req.key))
|
||||
req.reply.send(GetValueReply());
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
#include "fdbserver/ClusterRecruitmentInterface.h"
|
||||
#include "fdbclient/ReadYourWrites.h"
|
||||
#include "flow/TDMetric.actor.h"
|
||||
#include "flow/FBTrace.h"
|
||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||
|
||||
const int sampleSize = 10000;
|
||||
|
@ -618,6 +619,9 @@ struct ReadWriteWorkload : KVWorkload {
|
|||
debugID = deterministicRandom()->randomUniqueID();
|
||||
tr.debugTransaction(debugID);
|
||||
g_traceBatch.addEvent("TransactionDebug", debugID.first(), "ReadWrite.randomReadWriteClient.Before");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(debugID.first(), now(),
|
||||
TransactionDebugTrace::READWRITE_RANDOMREADWRITECLIENT_BEFORE)));
|
||||
}
|
||||
else {
|
||||
debugID = UID();
|
||||
|
@ -690,8 +694,12 @@ struct ReadWriteWorkload : KVWorkload {
|
|||
}
|
||||
}
|
||||
|
||||
if(debugID != UID())
|
||||
if(debugID != UID()) {
|
||||
g_traceBatch.addEvent("TransactionDebug", debugID.first(), "ReadWrite.randomReadWriteClient.After");
|
||||
//FIXME
|
||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(debugID.first(), now(),
|
||||
TransactionDebugTrace::READWRITE_RANDOMREADWRITECLIENT_AFTER)));
|
||||
}
|
||||
|
||||
tr = Trans();
|
||||
|
||||
|
|
|
@ -44,17 +44,20 @@ public:
|
|||
constexpr static FileIdentifier file_identifier = 617894;
|
||||
enum codeLocation {
|
||||
STORAGESERVER_GETVALUE_RECEIVED = 0,
|
||||
STORAGESERVER_GETVALUE_DO_READ = 1,
|
||||
STORAGESERVER_GETVALUE_AFTER_VERSION = 2,
|
||||
STORAGESERVER_GETVALUE_AFTER_READ = 3,
|
||||
STORAGESERVER_GETVALUE_DOREAD = 1,
|
||||
STORAGESERVER_GETVALUE_AFTERVERSION = 2,
|
||||
STORAGESERVER_GETVALUE_AFTERREAD = 3,
|
||||
STORAGECACHE_GETVALUE_RECEIVED = 4,
|
||||
STORAGECACHE_GETVALUE_DO_READ = 5,
|
||||
STORAGECACHE_GETVALUE_AFTER_VERSION = 6,
|
||||
STORAGECACHE_GETVALUE_AFTER_READ = 7,
|
||||
STORAGECACHE_GETVALUE_DOREAD = 5,
|
||||
STORAGECACHE_GETVALUE_AFTERVERSION = 6,
|
||||
STORAGECACHE_GETVALUE_AFTERREAD = 7,
|
||||
READER_GETVALUE_BEFORE = 8,
|
||||
READER_GETVALUE_AFTER = 9,
|
||||
READER_GETVALUEPREFIX_BEFORE = 10,
|
||||
READER_GETVALUEPREFIX_AFTER = 11
|
||||
READER_GETVALUEPREFIX_AFTER = 11,
|
||||
NATIVEAPI_GETVALUE_BEFORE = 12,
|
||||
NATIVEAPI_GETVALUE_AFTER = 13,
|
||||
NATIVEAPI_GETVALUE_ERROR = 14
|
||||
};
|
||||
|
||||
uint64_t id;
|
||||
|
@ -74,10 +77,10 @@ public:
|
|||
constexpr static FileIdentifier file_identifier = 14486715;
|
||||
enum codeLocation {
|
||||
STORAGESERVER_WATCHVALUE_BEFORE = 1,
|
||||
STORAGESERVER_WATCHVALUE_AFTER_VERSION = 2,
|
||||
STORAGESERVER_WATCHVALUE_AFTER_READ = 3,
|
||||
STORAGESERVER_WATCHVALUE_AFTERVERSION = 2,
|
||||
STORAGESERVER_WATCHVALUE_AFTERREAD = 3,
|
||||
NATIVEAPI_WATCHVALUE_BEFORE = 4,
|
||||
NATIVEAPI_WATCHVALUE_AFTER_READ = 5
|
||||
NATIVEAPI_WATCHVALUE_AFTER = 5
|
||||
};
|
||||
|
||||
uint64_t id;
|
||||
|
@ -99,9 +102,9 @@ public:
|
|||
STORAGESERVER_COMMIT_BEORE = 0,
|
||||
STORAGESERVER_COMMIT_AFTER_VERSION = 1,
|
||||
STORAGESERVER_COMMIT_AFTER_READ = 2,
|
||||
NATIVEAPI_COMMIT_BEORE = 3,
|
||||
NATIVEAPI_COMMIT_BEFORE = 3,
|
||||
NATIVEAPI_COMMIT_AFTER = 4,
|
||||
MASTERROXYSERVER_BATCHER = 5,
|
||||
MASTERPROXYSERVER_BATCHER = 5,
|
||||
MASTERPROXYSERVER_COMMITBATCH_BEFORE = 6,
|
||||
MASTERPROXYSERVER_COMMITBATCH_GETTINGCOMMITVERSION = 7,
|
||||
MASTERPROXYSERVER_COMMITBATCH_GOTCOMMITVERSION = 8,
|
||||
|
|
Loading…
Reference in New Issue