Revert "Add FBTrace calls for most debugTransaction trace messages."
This reverts commit be68bd6ad2
.
This commit is contained in:
parent
9f114d96c1
commit
e22b88d857
|
@ -47,7 +47,6 @@
|
||||||
#include "flow/SystemMonitor.h"
|
#include "flow/SystemMonitor.h"
|
||||||
#include "flow/TLSConfig.actor.h"
|
#include "flow/TLSConfig.actor.h"
|
||||||
#include "flow/UnitTest.h"
|
#include "flow/UnitTest.h"
|
||||||
#include "flow/FBTrace.h"
|
|
||||||
|
|
||||||
#if defined(CMAKE_BUILD) || !defined(WIN32)
|
#if defined(CMAKE_BUILD) || !defined(WIN32)
|
||||||
#include "versions.h"
|
#include "versions.h"
|
||||||
|
@ -1198,12 +1197,8 @@ ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation_internal(
|
||||||
ASSERT( key < allKeys.end );
|
ASSERT( key < allKeys.end );
|
||||||
}
|
}
|
||||||
|
|
||||||
if( info.debugID.present() ) {
|
if( info.debugID.present() )
|
||||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.Before");
|
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.Before");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::NATIVEAPI_GETKEYLOCATION_BEFORE)));
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
++cx->transactionKeyServerLocationRequests;
|
++cx->transactionKeyServerLocationRequests;
|
||||||
|
@ -1211,12 +1206,8 @@ ACTOR Future< pair<KeyRange,Reference<LocationInfo>> > getKeyLocation_internal(
|
||||||
when ( wait( cx->onMasterProxiesChanged() ) ) {}
|
when ( wait( cx->onMasterProxiesChanged() ) ) {}
|
||||||
when ( GetKeyServerLocationsReply rep = wait( loadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(key, Optional<KeyRef>(), 100, isBackward, key.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) {
|
when ( GetKeyServerLocationsReply rep = wait( loadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(key, Optional<KeyRef>(), 100, isBackward, key.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) {
|
||||||
++cx->transactionKeyServerLocationRequestsCompleted;
|
++cx->transactionKeyServerLocationRequestsCompleted;
|
||||||
if( info.debugID.present() ) {
|
if( info.debugID.present() )
|
||||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.After");
|
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocation.After");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::NATIVEAPI_GETKEYLOCATION_AFTER)));
|
|
||||||
}
|
|
||||||
ASSERT( rep.results.size() == 1 );
|
ASSERT( rep.results.size() == 1 );
|
||||||
|
|
||||||
auto locationInfo = cx->setCachedLocation(rep.results[0].first, rep.results[0].second);
|
auto locationInfo = cx->setCachedLocation(rep.results[0].first, rep.results[0].second);
|
||||||
|
@ -1245,12 +1236,8 @@ Future<pair<KeyRange, Reference<LocationInfo>>> getKeyLocation( Database const&
|
||||||
}
|
}
|
||||||
|
|
||||||
ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLocations_internal( Database cx, KeyRange keys, int limit, bool reverse, TransactionInfo info ) {
|
ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLocations_internal( Database cx, KeyRange keys, int limit, bool reverse, TransactionInfo info ) {
|
||||||
if( info.debugID.present() ) {
|
if( info.debugID.present() )
|
||||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.Before");
|
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.Before");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::NATIVEAPI_GETKEYLOCATIONS_BEFORE)));
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
++cx->transactionKeyServerLocationRequests;
|
++cx->transactionKeyServerLocationRequests;
|
||||||
|
@ -1259,12 +1246,8 @@ ACTOR Future< vector< pair<KeyRange,Reference<LocationInfo>> > > getKeyRangeLoca
|
||||||
when ( GetKeyServerLocationsReply _rep = wait( loadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(keys.begin, keys.end, limit, reverse, keys.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) {
|
when ( GetKeyServerLocationsReply _rep = wait( loadBalance( cx->getMasterProxies(info.useProvisionalProxies), &MasterProxyInterface::getKeyServersLocations, GetKeyServerLocationsRequest(keys.begin, keys.end, limit, reverse, keys.arena()), TaskPriority::DefaultPromiseEndpoint ) ) ) {
|
||||||
++cx->transactionKeyServerLocationRequestsCompleted;
|
++cx->transactionKeyServerLocationRequestsCompleted;
|
||||||
state GetKeyServerLocationsReply rep = _rep;
|
state GetKeyServerLocationsReply rep = _rep;
|
||||||
if( info.debugID.present() ) {
|
if( info.debugID.present() )
|
||||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.After");
|
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKeyLocations.After");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::NATIVEAPI_GETKEYLOCATIONS_AFTER)));
|
|
||||||
}
|
|
||||||
ASSERT( rep.results.size() );
|
ASSERT( rep.results.size() );
|
||||||
|
|
||||||
state vector< pair<KeyRange,Reference<LocationInfo>> > results;
|
state vector< pair<KeyRange,Reference<LocationInfo>> > results;
|
||||||
|
@ -1364,9 +1347,6 @@ ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Databa
|
||||||
|
|
||||||
g_traceBatch.addAttach("GetValueAttachID", info.debugID.get().first(), getValueID.get().first());
|
g_traceBatch.addAttach("GetValueAttachID", info.debugID.get().first(), getValueID.get().first());
|
||||||
g_traceBatch.addEvent("GetValueDebug", getValueID.get().first(), "NativeAPI.getValue.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
g_traceBatch.addEvent("GetValueDebug", getValueID.get().first(), "NativeAPI.getValue.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(getValueID.get().first(), now(),
|
|
||||||
GetValueDebugTrace::NATIVEAPI_GETVALUE_BEFORE)));
|
|
||||||
/*TraceEvent("TransactionDebugGetValueInfo", getValueID.get())
|
/*TraceEvent("TransactionDebugGetValueInfo", getValueID.get())
|
||||||
.detail("Key", key)
|
.detail("Key", key)
|
||||||
.detail("ReqVersion", ver)
|
.detail("ReqVersion", ver)
|
||||||
|
@ -1411,9 +1391,6 @@ ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Databa
|
||||||
|
|
||||||
if( info.debugID.present() ) {
|
if( info.debugID.present() ) {
|
||||||
g_traceBatch.addEvent("GetValueDebug", getValueID.get().first(), "NativeAPI.getValue.After"); //.detail("TaskID", g_network->getCurrentTask());
|
g_traceBatch.addEvent("GetValueDebug", getValueID.get().first(), "NativeAPI.getValue.After"); //.detail("TaskID", g_network->getCurrentTask());
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(getValueID.get().first(), now(),
|
|
||||||
GetValueDebugTrace::NATIVEAPI_GETVALUE_AFTER)));
|
|
||||||
/*TraceEvent("TransactionDebugGetValueDone", getValueID.get())
|
/*TraceEvent("TransactionDebugGetValueDone", getValueID.get())
|
||||||
.detail("Key", key)
|
.detail("Key", key)
|
||||||
.detail("ReqVersion", ver)
|
.detail("ReqVersion", ver)
|
||||||
|
@ -1428,9 +1405,6 @@ ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Databa
|
||||||
cx->getValueCompleted->log();
|
cx->getValueCompleted->log();
|
||||||
if( info.debugID.present() ) {
|
if( info.debugID.present() ) {
|
||||||
g_traceBatch.addEvent("GetValueDebug", getValueID.get().first(), "NativeAPI.getValue.Error"); //.detail("TaskID", g_network->getCurrentTask());
|
g_traceBatch.addEvent("GetValueDebug", getValueID.get().first(), "NativeAPI.getValue.Error"); //.detail("TaskID", g_network->getCurrentTask());
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(getValueID.get().first(), now(),
|
|
||||||
GetValueDebugTrace::NATIVEAPI_GETVALUE_ERROR)));
|
|
||||||
/*TraceEvent("TransactionDebugGetValueDone", getValueID.get())
|
/*TraceEvent("TransactionDebugGetValueDone", getValueID.get())
|
||||||
.detail("Key", key)
|
.detail("Key", key)
|
||||||
.detail("ReqVersion", ver)
|
.detail("ReqVersion", ver)
|
||||||
|
@ -1452,12 +1426,8 @@ ACTOR Future<Optional<Value>> getValue( Future<Version> version, Key key, Databa
|
||||||
ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, TransactionInfo info ) {
|
ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, TransactionInfo info ) {
|
||||||
wait(success(version));
|
wait(success(version));
|
||||||
|
|
||||||
if( info.debugID.present() ) {
|
if( info.debugID.present() )
|
||||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.AfterVersion");
|
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.AfterVersion");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::NATIVEAPI_GETKEY_AFTERVERSION)));
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if (k.getKey() == allKeys.end) {
|
if (k.getKey() == allKeys.end) {
|
||||||
|
@ -1472,12 +1442,8 @@ ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, T
|
||||||
state pair<KeyRange, Reference<LocationInfo>> ssi = wait( getKeyLocation(cx, locationKey, &StorageServerInterface::getKey, info, k.isBackward()) );
|
state pair<KeyRange, Reference<LocationInfo>> ssi = wait( getKeyLocation(cx, locationKey, &StorageServerInterface::getKey, info, k.isBackward()) );
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if( info.debugID.present() ) {
|
if( info.debugID.present() )
|
||||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.Before"); //.detail("StartKey", k.getKey()).detail("Offset",k.offset).detail("OrEqual",k.orEqual);
|
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.Before"); //.detail("StartKey", k.getKey()).detail("Offset",k.offset).detail("OrEqual",k.orEqual);
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::NATIVEAPI_GETKEY_BEFORE)));
|
|
||||||
}
|
|
||||||
++cx->transactionPhysicalReads;
|
++cx->transactionPhysicalReads;
|
||||||
state GetKeyReply reply;
|
state GetKeyReply reply;
|
||||||
try {
|
try {
|
||||||
|
@ -1495,12 +1461,8 @@ ACTOR Future<Key> getKey( Database cx, KeySelector k, Future<Version> version, T
|
||||||
++cx->transactionPhysicalReadsCompleted;
|
++cx->transactionPhysicalReadsCompleted;
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
if( info.debugID.present() ) {
|
if( info.debugID.present() )
|
||||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.After"); //.detail("NextKey",reply.sel.key).detail("Offset", reply.sel.offset).detail("OrEqual", k.orEqual);
|
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getKey.After"); //.detail("NextKey",reply.sel.key).detail("Offset", reply.sel.offset).detail("OrEqual", k.orEqual);
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::NATIVEAPI_GETKEY_AFTER)));
|
|
||||||
}
|
|
||||||
k = reply.sel;
|
k = reply.sel;
|
||||||
if (!k.offset && k.orEqual) {
|
if (!k.offset && k.orEqual) {
|
||||||
return k.getKey();
|
return k.getKey();
|
||||||
|
@ -1573,9 +1535,6 @@ ACTOR Future<Void> watchValue(Future<Version> version, Key key, Optional<Value>
|
||||||
|
|
||||||
g_traceBatch.addAttach("WatchValueAttachID", info.debugID.get().first(), watchValueID.get().first());
|
g_traceBatch.addAttach("WatchValueAttachID", info.debugID.get().first(), watchValueID.get().first());
|
||||||
g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<WatchValueDebugTrace>(new WatchValueDebugTrace(watchValueID.get().first(), now(),
|
|
||||||
WatchValueDebugTrace::NATIVEAPI_WATCHVALUE_BEFORE)));
|
|
||||||
}
|
}
|
||||||
state WatchValueReply resp;
|
state WatchValueReply resp;
|
||||||
choose {
|
choose {
|
||||||
|
@ -1588,9 +1547,6 @@ ACTOR Future<Void> watchValue(Future<Version> version, Key key, Optional<Value>
|
||||||
}
|
}
|
||||||
if( info.debugID.present() ) {
|
if( info.debugID.present() ) {
|
||||||
g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.After"); //.detail("TaskID", g_network->getCurrentTask());
|
g_traceBatch.addEvent("WatchValueDebug", watchValueID.get().first(), "NativeAPI.watchValue.After"); //.detail("TaskID", g_network->getCurrentTask());
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<WatchValueDebugTrace>(new WatchValueDebugTrace(watchValueID.get().first(), now(),
|
|
||||||
WatchValueDebugTrace::NATIVEAPI_WATCHVALUE_AFTER)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//FIXME: wait for known committed version on the storage server before replying,
|
//FIXME: wait for known committed version on the storage server before replying,
|
||||||
|
@ -1671,9 +1627,6 @@ ACTOR Future<Standalone<RangeResultRef>> getExactRange( Database cx, Version ver
|
||||||
try {
|
try {
|
||||||
if( info.debugID.present() ) {
|
if( info.debugID.present() ) {
|
||||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.Before");
|
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.Before");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::NATIVEAPI_GETEXACTRANGE_BEFORE)));
|
|
||||||
/*TraceEvent("TransactionDebugGetExactRangeInfo", info.debugID.get())
|
/*TraceEvent("TransactionDebugGetExactRangeInfo", info.debugID.get())
|
||||||
.detail("ReqBeginKey", req.begin.getKey())
|
.detail("ReqBeginKey", req.begin.getKey())
|
||||||
.detail("ReqEndKey", req.end.getKey())
|
.detail("ReqEndKey", req.end.getKey())
|
||||||
|
@ -1700,12 +1653,8 @@ ACTOR Future<Standalone<RangeResultRef>> getExactRange( Database cx, Version ver
|
||||||
++cx->transactionPhysicalReadsCompleted;
|
++cx->transactionPhysicalReadsCompleted;
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
if( info.debugID.present() ) {
|
if( info.debugID.present() )
|
||||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.After");
|
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getExactRange.After");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::NATIVEAPI_GETEXACTRANGE_AFTER)));
|
|
||||||
}
|
|
||||||
output.arena().dependsOn( rep.arena );
|
output.arena().dependsOn( rep.arena );
|
||||||
output.append( output.arena(), rep.data.begin(), rep.data.size() );
|
output.append( output.arena(), rep.data.begin(), rep.data.size() );
|
||||||
|
|
||||||
|
@ -1964,9 +1913,6 @@ ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<Transa
|
||||||
try {
|
try {
|
||||||
if( info.debugID.present() ) {
|
if( info.debugID.present() ) {
|
||||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Before");
|
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Before");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::NATIVEAPI_GETRANGE_BEFORE)));
|
|
||||||
/*TraceEvent("TransactionDebugGetRangeInfo", info.debugID.get())
|
/*TraceEvent("TransactionDebugGetRangeInfo", info.debugID.get())
|
||||||
.detail("ReqBeginKey", req.begin.getKey())
|
.detail("ReqBeginKey", req.begin.getKey())
|
||||||
.detail("ReqEndKey", req.end.getKey())
|
.detail("ReqEndKey", req.end.getKey())
|
||||||
|
@ -2002,9 +1948,6 @@ ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<Transa
|
||||||
|
|
||||||
if( info.debugID.present() ) {
|
if( info.debugID.present() ) {
|
||||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.After");//.detail("SizeOf", rep.data.size());
|
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.After");//.detail("SizeOf", rep.data.size());
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::NATIVEAPI_GETEXACTRANGE_AFTER)));
|
|
||||||
/*TraceEvent("TransactionDebugGetRangeDone", info.debugID.get())
|
/*TraceEvent("TransactionDebugGetRangeDone", info.debugID.get())
|
||||||
.detail("ReqBeginKey", req.begin.getKey())
|
.detail("ReqBeginKey", req.begin.getKey())
|
||||||
.detail("ReqEndKey", req.end.getKey())
|
.detail("ReqEndKey", req.end.getKey())
|
||||||
|
@ -2092,9 +2035,6 @@ ACTOR Future<Standalone<RangeResultRef>> getRange( Database cx, Reference<Transa
|
||||||
} catch ( Error& e ) {
|
} catch ( Error& e ) {
|
||||||
if( info.debugID.present() ) {
|
if( info.debugID.present() ) {
|
||||||
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Error");
|
g_traceBatch.addEvent("TransactionDebug", info.debugID.get().first(), "NativeAPI.getRange.Error");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(info.debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::NATIVEAPI_GETRANGE_ERROR)));
|
|
||||||
TraceEvent("TransactionDebugError", info.debugID.get()).error(e);
|
TraceEvent("TransactionDebugError", info.debugID.get()).error(e);
|
||||||
}
|
}
|
||||||
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||
|
if (e.code() == error_code_wrong_shard_server || e.code() == error_code_all_alternatives_failed ||
|
||||||
|
@ -2783,9 +2723,6 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
|
||||||
commitID = nondeterministicRandom()->randomUniqueID();
|
commitID = nondeterministicRandom()->randomUniqueID();
|
||||||
g_traceBatch.addAttach("CommitAttachID", info.debugID.get().first(), commitID.get().first());
|
g_traceBatch.addAttach("CommitAttachID", info.debugID.get().first(), commitID.get().first());
|
||||||
g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.Before");
|
g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.Before");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(commitID.get().first(), now(),
|
|
||||||
CommitDebugTrace::NATIVEAPI_COMMIT_BEFORE)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
req.debugID = commitID;
|
req.debugID = commitID;
|
||||||
|
@ -2829,12 +2766,9 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
|
||||||
cx->transactionCommittedMutations += req.transaction.mutations.size();
|
cx->transactionCommittedMutations += req.transaction.mutations.size();
|
||||||
cx->transactionCommittedMutationBytes += req.transaction.mutations.expectedSize();
|
cx->transactionCommittedMutationBytes += req.transaction.mutations.expectedSize();
|
||||||
|
|
||||||
if(info.debugID.present()) {
|
if(info.debugID.present())
|
||||||
g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.After");
|
g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.After");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(commitID.get().first(), now(),
|
|
||||||
CommitDebugTrace::NATIVEAPI_COMMIT_AFTER)));
|
|
||||||
}
|
|
||||||
double latency = now() - startTime;
|
double latency = now() - startTime;
|
||||||
cx->commitLatencies.addSample(latency);
|
cx->commitLatencies.addSample(latency);
|
||||||
cx->latencies.addSample(now() - tr->startTime);
|
cx->latencies.addSample(now() - tr->startTime);
|
||||||
|
@ -2884,12 +2818,9 @@ ACTOR static Future<Void> tryCommit( Database cx, Reference<TransactionLogInfo>
|
||||||
if (info.debugID.present())
|
if (info.debugID.present())
|
||||||
TraceEvent(interval.end()).detail("Conflict", 1);
|
TraceEvent(interval.end()).detail("Conflict", 1);
|
||||||
|
|
||||||
if(info.debugID.present()) {
|
if(info.debugID.present())
|
||||||
g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.After");
|
g_traceBatch.addEvent("CommitDebug", commitID.get().first(), "NativeAPI.commit.After");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(commitID.get().first(), now(),
|
|
||||||
CommitDebugTrace::NATIVEAPI_COMMIT_AFTER)));
|
|
||||||
}
|
|
||||||
throw not_committed();
|
throw not_committed();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3219,23 +3150,15 @@ void Transaction::setOption( FDBTransactionOptions::Option option, Optional<Stri
|
||||||
ACTOR Future<GetReadVersionReply> getConsistentReadVersion( DatabaseContext *cx, uint32_t transactionCount, uint32_t flags, Optional<UID> debugID ) {
|
ACTOR Future<GetReadVersionReply> getConsistentReadVersion( DatabaseContext *cx, uint32_t transactionCount, uint32_t flags, Optional<UID> debugID ) {
|
||||||
try {
|
try {
|
||||||
++cx->transactionReadVersionBatches;
|
++cx->transactionReadVersionBatches;
|
||||||
if( debugID.present() ) {
|
if( debugID.present() )
|
||||||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.Before");
|
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.Before");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::NATIVEAPI_GETCONSISTENTREADVERSION_BEFORE)));
|
|
||||||
}
|
|
||||||
loop {
|
loop {
|
||||||
state GetReadVersionRequest req( transactionCount, flags, debugID );
|
state GetReadVersionRequest req( transactionCount, flags, debugID );
|
||||||
choose {
|
choose {
|
||||||
when ( wait( cx->onMasterProxiesChanged() ) ) {}
|
when ( wait( cx->onMasterProxiesChanged() ) ) {}
|
||||||
when ( GetReadVersionReply v = wait( loadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) {
|
when ( GetReadVersionReply v = wait( loadBalance( cx->getMasterProxies(flags & GetReadVersionRequest::FLAG_USE_PROVISIONAL_PROXIES), &MasterProxyInterface::getConsistentReadVersion, req, cx->taskID ) ) ) {
|
||||||
if( debugID.present() ) {
|
if( debugID.present() )
|
||||||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.After");
|
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "NativeAPI.getConsistentReadVersion.After");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::NATIVEAPI_GETCONSISTENTREADVERSION_AFTER)));
|
|
||||||
}
|
|
||||||
ASSERT( v.version > 0 );
|
ASSERT( v.version > 0 );
|
||||||
cx->minAcceptableReadVersion = std::min(cx->minAcceptableReadVersion, v.version);
|
cx->minAcceptableReadVersion = std::min(cx->minAcceptableReadVersion, v.version);
|
||||||
return v;
|
return v;
|
||||||
|
|
|
@ -25,7 +25,6 @@
|
||||||
#include "fdbserver/CoroFlow.h"
|
#include "fdbserver/CoroFlow.h"
|
||||||
#include "fdbserver/Knobs.h"
|
#include "fdbserver/Knobs.h"
|
||||||
#include "flow/Hash3.h"
|
#include "flow/Hash3.h"
|
||||||
#include "flow/FBTrace.h"
|
|
||||||
|
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#include "fdbserver/sqlite/sqliteInt.h"
|
#include "fdbserver/sqlite/sqliteInt.h"
|
||||||
|
@ -1531,22 +1530,12 @@ private:
|
||||||
};
|
};
|
||||||
void action( ReadValueAction& rv ) {
|
void action( ReadValueAction& rv ) {
|
||||||
//double t = timer();
|
//double t = timer();
|
||||||
if (rv.debugID.present()) {
|
if (rv.debugID.present()) g_traceBatch.addEvent("GetValueDebug", rv.debugID.get().first(), "Reader.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
||||||
g_traceBatch.addEvent("GetValueDebug", rv.debugID.get().first(), "Reader.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(rv.debugID.get().first(), now(),
|
|
||||||
GetValueDebugTrace::READER_GETVALUE_BEFORE)));
|
|
||||||
}
|
|
||||||
|
|
||||||
rv.result.send( getCursor()->get().get(rv.key) );
|
rv.result.send( getCursor()->get().get(rv.key) );
|
||||||
++counter;
|
++counter;
|
||||||
|
|
||||||
if (rv.debugID.present()) {
|
if (rv.debugID.present()) g_traceBatch.addEvent("GetValueDebug", rv.debugID.get().first(), "Reader.After"); //.detail("TaskID", g_network->getCurrentTask());
|
||||||
g_traceBatch.addEvent("GetValueDebug", rv.debugID.get().first(), "Reader.After"); //.detail("TaskID", g_network->getCurrentTask());
|
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(rv.debugID.get().first(), now(),
|
|
||||||
GetValueDebugTrace::READER_GETVALUE_AFTER)));
|
|
||||||
}
|
|
||||||
//t = timer()-t;
|
//t = timer()-t;
|
||||||
//if (t >= 1.0) TraceEvent("ReadValueActionSlow",dbgid).detail("Elapsed", t);
|
//if (t >= 1.0) TraceEvent("ReadValueActionSlow",dbgid).detail("Elapsed", t);
|
||||||
}
|
}
|
||||||
|
@ -1561,22 +1550,12 @@ private:
|
||||||
};
|
};
|
||||||
void action( ReadValuePrefixAction& rv ) {
|
void action( ReadValuePrefixAction& rv ) {
|
||||||
//double t = timer();
|
//double t = timer();
|
||||||
if (rv.debugID.present()) {
|
if (rv.debugID.present()) g_traceBatch.addEvent("GetValuePrefixDebug", rv.debugID.get().first(), "Reader.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
||||||
g_traceBatch.addEvent("GetValuePrefixDebug", rv.debugID.get().first(), "Reader.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(rv.debugID.get().first(), now(),
|
|
||||||
GetValueDebugTrace::READER_GETVALUEPREFIX_BEFORE)));
|
|
||||||
}
|
|
||||||
|
|
||||||
rv.result.send( getCursor()->get().getPrefix(rv.key, rv.maxLength) );
|
rv.result.send( getCursor()->get().getPrefix(rv.key, rv.maxLength) );
|
||||||
++counter;
|
++counter;
|
||||||
|
|
||||||
if (rv.debugID.present()) {
|
if (rv.debugID.present()) g_traceBatch.addEvent("GetValuePrefixDebug", rv.debugID.get().first(), "Reader.After"); //.detail("TaskID", g_network->getCurrentTask());
|
||||||
g_traceBatch.addEvent("GetValuePrefixDebug", rv.debugID.get().first(), "Reader.After"); //.detail("TaskID", g_network->getCurrentTask());
|
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(rv.debugID.get().first(), now(),
|
|
||||||
GetValueDebugTrace::READER_GETVALUEPREFIX_AFTER)));
|
|
||||||
}
|
|
||||||
//t = timer()-t;
|
//t = timer()-t;
|
||||||
//if (t >= 1.0) TraceEvent("ReadValuePrefixActionSlow",dbgid).detail("Elapsed", t);
|
//if (t >= 1.0) TraceEvent("ReadValuePrefixActionSlow",dbgid).detail("Elapsed", t);
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,7 +45,6 @@
|
||||||
#include "flow/ActorCollection.h"
|
#include "flow/ActorCollection.h"
|
||||||
#include "flow/Knobs.h"
|
#include "flow/Knobs.h"
|
||||||
#include "flow/Stats.h"
|
#include "flow/Stats.h"
|
||||||
#include "flow/FBTrace.h"
|
|
||||||
#include "flow/TDMetric.actor.h"
|
#include "flow/TDMetric.actor.h"
|
||||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||||
|
|
||||||
|
@ -247,12 +246,8 @@ ACTOR Future<Void> queueTransactionStartRequests(
|
||||||
req.reply.send(rep);
|
req.reply.send(rep);
|
||||||
TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceeded").suppressFor(60);
|
TraceEvent(SevWarnAlways, "ProxyGRVThresholdExceeded").suppressFor(60);
|
||||||
} else {
|
} else {
|
||||||
if (req.debugID.present()) {
|
if (req.debugID.present())
|
||||||
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.queueTransactionStartRequests.Before");
|
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.queueTransactionStartRequests.Before");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::MASTERPROXYSERVER_QUEUETRANSACTIONSTARTREQUESTS_BEFORE)));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (systemQueue->empty() && defaultQueue->empty() && batchQueue->empty()) {
|
if (systemQueue->empty() && defaultQueue->empty() && batchQueue->empty()) {
|
||||||
forwardPromise(GRVTimer, delayJittered(std::max(0.0, *GRVBatchTime - (now() - *lastGRVTime)), TaskPriority::ProxyGRVTimer));
|
forwardPromise(GRVTimer, delayJittered(std::max(0.0, *GRVBatchTime - (now() - *lastGRVTime)), TaskPriority::ProxyGRVTimer));
|
||||||
|
@ -556,9 +551,6 @@ ACTOR Future<Void> commitBatcher(ProxyCommitData *commitData, PromiseStream<std:
|
||||||
|
|
||||||
if(req.debugID.present()) {
|
if(req.debugID.present()) {
|
||||||
g_traceBatch.addEvent("CommitDebug", req.debugID.get().first(), "MasterProxyServer.batcher");
|
g_traceBatch.addEvent("CommitDebug", req.debugID.get().first(), "MasterProxyServer.batcher");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
CommitDebugTrace::MASTERPROXYSERVER_BATCHER)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if(!batch.size()) {
|
if(!batch.size()) {
|
||||||
|
@ -747,12 +739,8 @@ ACTOR Future<Void> commitBatch(
|
||||||
TraceEvent("SecondCommitBatch", self->dbgid).detail("DebugID", debugID.get());
|
TraceEvent("SecondCommitBatch", self->dbgid).detail("DebugID", debugID.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (debugID.present()) {
|
if (debugID.present())
|
||||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.Before");
|
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.Before");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(debugID.get().first(), now(),
|
|
||||||
CommitDebugTrace::MASTERPROXYSERVER_COMMITBATCH_BEFORE)));
|
|
||||||
}
|
|
||||||
|
|
||||||
/////// Phase 1: Pre-resolution processing (CPU bound except waiting for a version # which is separately pipelined and *should* be available by now (unless empty commit); ordered; currently atomic but could yield)
|
/////// Phase 1: Pre-resolution processing (CPU bound except waiting for a version # which is separately pipelined and *should* be available by now (unless empty commit); ordered; currently atomic but could yield)
|
||||||
|
|
||||||
|
@ -761,12 +749,8 @@ ACTOR Future<Void> commitBatch(
|
||||||
wait(self->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber-1));
|
wait(self->latestLocalCommitBatchResolving.whenAtLeast(localBatchNumber-1));
|
||||||
state Future<Void> releaseDelay = delay(std::min(SERVER_KNOBS->MAX_PROXY_COMPUTE, batchOperations*self->commitComputePerOperation[latencyBucket]), TaskPriority::ProxyMasterVersionReply);
|
state Future<Void> releaseDelay = delay(std::min(SERVER_KNOBS->MAX_PROXY_COMPUTE, batchOperations*self->commitComputePerOperation[latencyBucket]), TaskPriority::ProxyMasterVersionReply);
|
||||||
|
|
||||||
if (debugID.present()) {
|
if (debugID.present())
|
||||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GettingCommitVersion");
|
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GettingCommitVersion");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(debugID.get().first(), now(),
|
|
||||||
CommitDebugTrace::MASTERPROXYSERVER_COMMITBATCH_GETTINGCOMMITVERSION)));
|
|
||||||
}
|
|
||||||
|
|
||||||
GetCommitVersionRequest req(self->commitVersionRequestNumber++, self->mostRecentProcessedRequestNumber, self->dbgid);
|
GetCommitVersionRequest req(self->commitVersionRequestNumber++, self->mostRecentProcessedRequestNumber, self->dbgid);
|
||||||
GetCommitVersionReply versionReply = wait( brokenPromiseToNever(self->master.getCommitVersion.getReply(req, TaskPriority::ProxyMasterVersionReply)) );
|
GetCommitVersionReply versionReply = wait( brokenPromiseToNever(self->master.getCommitVersion.getReply(req, TaskPriority::ProxyMasterVersionReply)) );
|
||||||
|
@ -786,12 +770,8 @@ ACTOR Future<Void> commitBatch(
|
||||||
|
|
||||||
//TraceEvent("ProxyGotVer", self->dbgid).detail("Commit", commitVersion).detail("Prev", prevVersion);
|
//TraceEvent("ProxyGotVer", self->dbgid).detail("Commit", commitVersion).detail("Prev", prevVersion);
|
||||||
|
|
||||||
if (debugID.present()) {
|
if (debugID.present())
|
||||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GotCommitVersion");
|
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.GotCommitVersion");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(debugID.get().first(), now(),
|
|
||||||
CommitDebugTrace::MASTERPROXYSERVER_COMMITBATCH_GOTCOMMITVERSION)));
|
|
||||||
}
|
|
||||||
|
|
||||||
ResolutionRequestBuilder requests( self, commitVersion, prevVersion, self->version );
|
ResolutionRequestBuilder requests( self, commitVersion, prevVersion, self->version );
|
||||||
int conflictRangeCount = 0;
|
int conflictRangeCount = 0;
|
||||||
|
@ -823,12 +803,8 @@ ACTOR Future<Void> commitBatch(
|
||||||
/////// Phase 2: Resolution (waiting on the network; pipelined)
|
/////// Phase 2: Resolution (waiting on the network; pipelined)
|
||||||
state vector<ResolveTransactionBatchReply> resolution = wait( getAll(replies) );
|
state vector<ResolveTransactionBatchReply> resolution = wait( getAll(replies) );
|
||||||
|
|
||||||
if (debugID.present()) {
|
if (debugID.present())
|
||||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.AfterResolution");
|
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.AfterResolution");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(debugID.get().first(), now(),
|
|
||||||
CommitDebugTrace::MASTERPROXYSERVER_COMMITBATCH_AFTERRESOLUTION)));
|
|
||||||
}
|
|
||||||
|
|
||||||
////// Phase 3: Post-resolution processing (CPU bound except for very rare situations; ordered; currently atomic but doesn't need to be)
|
////// Phase 3: Post-resolution processing (CPU bound except for very rare situations; ordered; currently atomic but doesn't need to be)
|
||||||
TEST(self->latestLocalCommitBatchLogging.get() < localBatchNumber - 1); // Queuing post-resolution commit processing
|
TEST(self->latestLocalCommitBatchLogging.get() < localBatchNumber - 1); // Queuing post-resolution commit processing
|
||||||
|
@ -839,12 +815,8 @@ ACTOR Future<Void> commitBatch(
|
||||||
state double computeDuration = 0;
|
state double computeDuration = 0;
|
||||||
self->stats.txnCommitResolved += trs.size();
|
self->stats.txnCommitResolved += trs.size();
|
||||||
|
|
||||||
if (debugID.present()) {
|
if (debugID.present())
|
||||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.ProcessingMutations");
|
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.ProcessingMutations");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(debugID.get().first(), now(),
|
|
||||||
CommitDebugTrace::MASTERPROXYSERVER_COMMITBATCH_PROCESSINGMUTATIONS)));
|
|
||||||
}
|
|
||||||
|
|
||||||
state Arena arena;
|
state Arena arena;
|
||||||
state bool isMyFirstBatch = !self->version;
|
state bool isMyFirstBatch = !self->version;
|
||||||
|
@ -1141,12 +1113,8 @@ ACTOR Future<Void> commitBatch(
|
||||||
|
|
||||||
state LogSystemDiskQueueAdapter::CommitMessage msg = storeCommits.back().first.get();
|
state LogSystemDiskQueueAdapter::CommitMessage msg = storeCommits.back().first.get();
|
||||||
|
|
||||||
if (debugID.present()) {
|
if (debugID.present())
|
||||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.AfterStoreCommits");
|
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.AfterStoreCommits");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(debugID.get().first(), now(),
|
|
||||||
CommitDebugTrace::MASTERPROXYSERVER_COMMITBATCH_AFTERSTORECOMMITS)));
|
|
||||||
}
|
|
||||||
|
|
||||||
// txnState (transaction subsystem state) tag: message extracted from log adapter
|
// txnState (transaction subsystem state) tag: message extracted from log adapter
|
||||||
bool firstMessage = true;
|
bool firstMessage = true;
|
||||||
|
@ -1220,12 +1188,8 @@ ACTOR Future<Void> commitBatch(
|
||||||
debug_advanceMinCommittedVersion(UID(), commitVersion);
|
debug_advanceMinCommittedVersion(UID(), commitVersion);
|
||||||
|
|
||||||
//TraceEvent("ProxyPushed", self->dbgid).detail("PrevVersion", prevVersion).detail("Version", commitVersion);
|
//TraceEvent("ProxyPushed", self->dbgid).detail("PrevVersion", prevVersion).detail("Version", commitVersion);
|
||||||
if (debugID.present()) {
|
if (debugID.present())
|
||||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.AfterLogPush");
|
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "MasterProxyServer.commitBatch.AfterLogPush");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(debugID.get().first(), now(),
|
|
||||||
CommitDebugTrace::MASTERPROXYSERVER_COMMITBATCH_AFTERLOGPUSH)));
|
|
||||||
}
|
|
||||||
|
|
||||||
for (auto &p : storeCommits) {
|
for (auto &p : storeCommits) {
|
||||||
ASSERT(!p.second.isReady());
|
ASSERT(!p.second.isReady());
|
||||||
|
@ -1352,9 +1316,6 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commi
|
||||||
|
|
||||||
if (debugID.present()) {
|
if (debugID.present()) {
|
||||||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.confirmEpochLive");
|
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.confirmEpochLive");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::MASTERPROXYSERVER_GETLIVECOMMITTEDVERSION_CONFIRMEPOCHLIVE)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
vector<GetReadVersionReply> versions = wait(getAll(proxyVersions));
|
vector<GetReadVersionReply> versions = wait(getAll(proxyVersions));
|
||||||
|
@ -1371,9 +1332,6 @@ ACTOR Future<GetReadVersionReply> getLiveCommittedVersion(ProxyCommitData* commi
|
||||||
|
|
||||||
if (debugID.present()) {
|
if (debugID.present()) {
|
||||||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.After");
|
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.getLiveCommittedVersion.After");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::MASTERPROXYSERVER_GETLIVECOMMITTEDVERSION_AFTER)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
commitData->stats.txnStartOut += transactionCount;
|
commitData->stats.txnStartOut += transactionCount;
|
||||||
|
@ -1535,9 +1493,6 @@ ACTOR static Future<Void> transactionStarter(
|
||||||
|
|
||||||
if (debugID.present()) {
|
if (debugID.present()) {
|
||||||
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.masterProxyServerCore.Broadcast");
|
g_traceBatch.addEvent("TransactionDebug", debugID.get().first(), "MasterProxyServer.masterProxyServerCore.Broadcast");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::MASTERPROXYSERVER_MASTERPROXYSERVERCORE_BROADCAST)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < start.size(); i++) {
|
for (int i = 0; i < start.size(); i++) {
|
||||||
|
@ -1984,12 +1939,8 @@ ACTOR Future<Void> masterProxyServerCore(
|
||||||
}
|
}
|
||||||
when(GetRawCommittedVersionRequest req = waitNext(proxy.getRawCommittedVersion.getFuture())) {
|
when(GetRawCommittedVersionRequest req = waitNext(proxy.getRawCommittedVersion.getFuture())) {
|
||||||
//TraceEvent("ProxyGetRCV", proxy.id());
|
//TraceEvent("ProxyGetRCV", proxy.id());
|
||||||
if (req.debugID.present()) {
|
if (req.debugID.present())
|
||||||
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.masterProxyServerCore.GetRawCommittedVersion");
|
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "MasterProxyServer.masterProxyServerCore.GetRawCommittedVersion");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::MASTERPROXYSERVER_MASTERPROXYSERVERCORE_GETRAWCOMMITTEDVERSION)));
|
|
||||||
}
|
|
||||||
GetReadVersionReply rep;
|
GetReadVersionReply rep;
|
||||||
rep.locked = commitData.locked;
|
rep.locked = commitData.locked;
|
||||||
rep.metadataVersion = commitData.metadataVersion;
|
rep.metadataVersion = commitData.metadataVersion;
|
||||||
|
|
|
@ -30,7 +30,6 @@
|
||||||
#include "fdbserver/ConflictSet.h"
|
#include "fdbserver/ConflictSet.h"
|
||||||
#include "fdbserver/StorageMetrics.h"
|
#include "fdbserver/StorageMetrics.h"
|
||||||
#include "fdbclient/SystemData.h"
|
#include "fdbclient/SystemData.h"
|
||||||
#include "flow/FBTrace.h"
|
|
||||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
@ -116,9 +115,6 @@ ACTOR Future<Void> resolveBatch(
|
||||||
debugID = nondeterministicRandom()->randomUniqueID();
|
debugID = nondeterministicRandom()->randomUniqueID();
|
||||||
g_traceBatch.addAttach("CommitAttachID", req.debugID.get().first(), debugID.get().first());
|
g_traceBatch.addAttach("CommitAttachID", req.debugID.get().first(), debugID.get().first());
|
||||||
g_traceBatch.addEvent("CommitDebug",debugID.get().first(),"Resolver.resolveBatch.Before");
|
g_traceBatch.addEvent("CommitDebug",debugID.get().first(),"Resolver.resolveBatch.Before");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
CommitDebugTrace::RESOLVER_RESOLVEBATCH_BEFORE)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*TraceEvent("ResolveBatchStart", self->dbgid).detail("From", proxyAddress).detail("Version", req.version).detail("PrevVersion", req.prevVersion).detail("StateTransactions", req.txnStateTransactions.size())
|
/*TraceEvent("ResolveBatchStart", self->dbgid).detail("From", proxyAddress).detail("Version", req.version).detail("PrevVersion", req.prevVersion).detail("StateTransactions", req.txnStateTransactions.size())
|
||||||
|
@ -136,9 +132,6 @@ ACTOR Future<Void> resolveBatch(
|
||||||
|
|
||||||
if(debugID.present()) {
|
if(debugID.present()) {
|
||||||
g_traceBatch.addEvent("CommitDebug",debugID.get().first(),"Resolver.resolveBatch.AfterQueueSizeCheck");
|
g_traceBatch.addEvent("CommitDebug",debugID.get().first(),"Resolver.resolveBatch.AfterQueueSizeCheck");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
CommitDebugTrace::RESOLVER_RESOLVEBATCH_AFTERQUEUESIZECHECK)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
@ -171,12 +164,8 @@ ACTOR Future<Void> resolveBatch(
|
||||||
Version firstUnseenVersion = proxyInfo.lastVersion + 1;
|
Version firstUnseenVersion = proxyInfo.lastVersion + 1;
|
||||||
proxyInfo.lastVersion = req.version;
|
proxyInfo.lastVersion = req.version;
|
||||||
|
|
||||||
if(req.debugID.present()) {
|
if(req.debugID.present())
|
||||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "Resolver.resolveBatch.AfterOrderer");
|
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "Resolver.resolveBatch.AfterOrderer");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
CommitDebugTrace::RESOLVER_RESOLVEBATCH_AFTERORDERER)));
|
|
||||||
}
|
|
||||||
|
|
||||||
ResolveTransactionBatchReply& reply = proxyInfo.outstandingBatches[req.version];
|
ResolveTransactionBatchReply& reply = proxyInfo.outstandingBatches[req.version];
|
||||||
|
|
||||||
|
@ -287,12 +276,8 @@ ACTOR Future<Void> resolveBatch(
|
||||||
self->checkNeededVersion.trigger();
|
self->checkNeededVersion.trigger();
|
||||||
}
|
}
|
||||||
|
|
||||||
if(req.debugID.present()) {
|
if(req.debugID.present())
|
||||||
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "Resolver.resolveBatch.After");
|
g_traceBatch.addEvent("CommitDebug", debugID.get().first(), "Resolver.resolveBatch.After");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
CommitDebugTrace::RESOLVER_RESOLVEBATCH_AFTER)));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
TEST(true); // Duplicate resolve batch request
|
TEST(true); // Duplicate resolve batch request
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -21,7 +21,6 @@
|
||||||
#include "flow/Hash3.h"
|
#include "flow/Hash3.h"
|
||||||
#include "flow/Stats.h"
|
#include "flow/Stats.h"
|
||||||
#include "flow/UnitTest.h"
|
#include "flow/UnitTest.h"
|
||||||
#include "flow/FBTrace.h"
|
|
||||||
#include "fdbclient/NativeAPI.actor.h"
|
#include "fdbclient/NativeAPI.actor.h"
|
||||||
#include "fdbclient/Notified.h"
|
#include "fdbclient/Notified.h"
|
||||||
#include "fdbclient/KeyRangeMap.h"
|
#include "fdbclient/KeyRangeMap.h"
|
||||||
|
@ -1767,9 +1766,6 @@ ACTOR Future<Void> tLogCommit(
|
||||||
tlogDebugID = nondeterministicRandom()->randomUniqueID();
|
tlogDebugID = nondeterministicRandom()->randomUniqueID();
|
||||||
g_traceBatch.addAttach("CommitAttachID", req.debugID.get().first(), tlogDebugID.get().first());
|
g_traceBatch.addAttach("CommitAttachID", req.debugID.get().first(), tlogDebugID.get().first());
|
||||||
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.BeforeWaitForVersion");
|
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.BeforeWaitForVersion");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
CommitDebugTrace::TLOG_TLOGCOMMIT_BEFOREWAITFORVERSION)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
logData->minKnownCommittedVersion = std::max(logData->minKnownCommittedVersion, req.minKnownCommittedVersion);
|
logData->minKnownCommittedVersion = std::max(logData->minKnownCommittedVersion, req.minKnownCommittedVersion);
|
||||||
|
@ -1799,12 +1795,8 @@ ACTOR Future<Void> tLogCommit(
|
||||||
}
|
}
|
||||||
|
|
||||||
if (logData->version.get() == req.prevVersion) { // Not a duplicate (check relies on critical section between here self->version.set() below!)
|
if (logData->version.get() == req.prevVersion) { // Not a duplicate (check relies on critical section between here self->version.set() below!)
|
||||||
if(req.debugID.present()) {
|
if(req.debugID.present())
|
||||||
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.Before");
|
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.Before");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
CommitDebugTrace::TLOG_TLOGCOMMIT_BEFORE)));
|
|
||||||
}
|
|
||||||
|
|
||||||
//TraceEvent("TLogCommit", logData->logId).detail("Version", req.version);
|
//TraceEvent("TLogCommit", logData->logId).detail("Version", req.version);
|
||||||
commitMessages(self, logData, req.version, req.arena, req.messages);
|
commitMessages(self, logData, req.version, req.arena, req.messages);
|
||||||
|
@ -1827,12 +1819,8 @@ ACTOR Future<Void> tLogCommit(
|
||||||
// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors
|
// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors
|
||||||
logData->version.set( req.version );
|
logData->version.set( req.version );
|
||||||
|
|
||||||
if(req.debugID.present()) {
|
if(req.debugID.present())
|
||||||
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.AfterTLogCommit");
|
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.AfterTLogCommit");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
CommitDebugTrace::TLOG_TLOGCOMMIT_AFTERTLOGCOMMIT)));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// Send replies only once all prior messages have been received and committed.
|
// Send replies only once all prior messages have been received and committed.
|
||||||
state Future<Void> stopped = logData->stopCommit.onTrigger();
|
state Future<Void> stopped = logData->stopCommit.onTrigger();
|
||||||
|
@ -1844,12 +1832,8 @@ ACTOR Future<Void> tLogCommit(
|
||||||
return Void();
|
return Void();
|
||||||
}
|
}
|
||||||
|
|
||||||
if(req.debugID.present()) {
|
if(req.debugID.present())
|
||||||
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.After");
|
g_traceBatch.addEvent("CommitDebug", tlogDebugID.get().first(), "TLog.tLogCommit.After");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<CommitDebugTrace>(new CommitDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
CommitDebugTrace::TLOG_TLOGCOMMIT_AFTER)));
|
|
||||||
}
|
|
||||||
|
|
||||||
req.reply.send( logData->durableKnownCommittedVersion );
|
req.reply.send( logData->durableKnownCommittedVersion );
|
||||||
return Void();
|
return Void();
|
||||||
|
@ -2115,9 +2099,6 @@ ACTOR Future<Void> serveTLogInterface( TLogData* self, TLogInterface tli, Refere
|
||||||
UID tlogDebugID = nondeterministicRandom()->randomUniqueID();
|
UID tlogDebugID = nondeterministicRandom()->randomUniqueID();
|
||||||
g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), tlogDebugID.first());
|
g_traceBatch.addAttach("TransactionAttachID", req.debugID.get().first(), tlogDebugID.first());
|
||||||
g_traceBatch.addEvent("TransactionDebug", tlogDebugID.first(), "TLogServer.TLogConfirmRunningRequest");
|
g_traceBatch.addEvent("TransactionDebug", tlogDebugID.first(), "TLogServer.TLogConfirmRunningRequest");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::TLOGSERVER_TLOGCONFIRMRUNNINGREQUEST)));
|
|
||||||
}
|
}
|
||||||
if (!logData->stopped)
|
if (!logData->stopped)
|
||||||
req.reply.send(Void());
|
req.reply.send(Void());
|
||||||
|
|
|
@ -51,7 +51,6 @@
|
||||||
#include "fdbrpc/sim_validation.h"
|
#include "fdbrpc/sim_validation.h"
|
||||||
#include "fdbrpc/Smoother.h"
|
#include "fdbrpc/Smoother.h"
|
||||||
#include "flow/Stats.h"
|
#include "flow/Stats.h"
|
||||||
#include "flow/FBTrace.h"
|
|
||||||
#include "flow/TDMetric.actor.h"
|
#include "flow/TDMetric.actor.h"
|
||||||
#include <type_traits>
|
#include <type_traits>
|
||||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||||
|
@ -839,21 +838,13 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
|
||||||
// so we need to downgrade here
|
// so we need to downgrade here
|
||||||
wait( delay(0, TaskPriority::DefaultEndpoint) );
|
wait( delay(0, TaskPriority::DefaultEndpoint) );
|
||||||
|
|
||||||
if( req.debugID.present() ) {
|
if( req.debugID.present() )
|
||||||
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask());
|
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.DoRead"); //.detail("TaskID", g_network->getCurrentTask());
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
GetValueDebugTrace::STORAGESERVER_GETVALUE_DOREAD)));
|
|
||||||
}
|
|
||||||
|
|
||||||
state Optional<Value> v;
|
state Optional<Value> v;
|
||||||
state Version version = wait( waitForVersion( data, req.version ) );
|
state Version version = wait( waitForVersion( data, req.version ) );
|
||||||
if( req.debugID.present() ) {
|
if( req.debugID.present() )
|
||||||
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
|
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
GetValueDebugTrace::STORAGESERVER_GETVALUE_AFTERVERSION)));
|
|
||||||
}
|
|
||||||
|
|
||||||
state uint64_t changeCounter = data->shardChangeCounter;
|
state uint64_t changeCounter = data->shardChangeCounter;
|
||||||
|
|
||||||
|
@ -906,12 +897,8 @@ ACTOR Future<Void> getValueQ( StorageServer* data, GetValueRequest req ) {
|
||||||
data->metrics.notifyBytesReadPerKSecond(req.key, bytesReadPerKSecond);
|
data->metrics.notifyBytesReadPerKSecond(req.key, bytesReadPerKSecond);
|
||||||
}
|
}
|
||||||
|
|
||||||
if( req.debugID.present() ) {
|
if( req.debugID.present() )
|
||||||
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask());
|
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "getValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask());
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
GetValueDebugTrace::STORAGESERVER_GETVALUE_AFTERREAD)));
|
|
||||||
}
|
|
||||||
|
|
||||||
GetValueReply reply(v);
|
GetValueReply reply(v);
|
||||||
reply.penalty = data->getPenalty();
|
reply.penalty = data->getPenalty();
|
||||||
|
@ -936,20 +923,12 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req )
|
||||||
try {
|
try {
|
||||||
++data->counters.watchQueries;
|
++data->counters.watchQueries;
|
||||||
|
|
||||||
if( req.debugID.present() ) {
|
if( req.debugID.present() )
|
||||||
g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.Before"); //.detail("TaskID", g_network->getCurrentTask());
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<WatchValueDebugTrace>(new WatchValueDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
WatchValueDebugTrace::STORAGESERVER_WATCHVALUE_BEFORE)));
|
|
||||||
}
|
|
||||||
|
|
||||||
wait(success(waitForVersionNoTooOld(data, req.version)));
|
wait(success(waitForVersionNoTooOld(data, req.version)));
|
||||||
if( req.debugID.present() ) {
|
if( req.debugID.present() )
|
||||||
g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
|
g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterVersion"); //.detail("TaskID", g_network->getCurrentTask());
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<WatchValueDebugTrace>(new WatchValueDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
WatchValueDebugTrace::STORAGESERVER_WATCHVALUE_AFTERVERSION)));
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
try {
|
try {
|
||||||
|
@ -966,12 +945,8 @@ ACTOR Future<Void> watchValue_impl( StorageServer* data, WatchValueRequest req )
|
||||||
|
|
||||||
debugMutation("ShardWatchValue", latest, MutationRef(MutationRef::DebugKey, req.key, reply.value.present() ? StringRef( reply.value.get() ) : LiteralStringRef("<null>") ) );
|
debugMutation("ShardWatchValue", latest, MutationRef(MutationRef::DebugKey, req.key, reply.value.present() ? StringRef( reply.value.get() ) : LiteralStringRef("<null>") ) );
|
||||||
|
|
||||||
if( req.debugID.present() ) {
|
if( req.debugID.present() )
|
||||||
g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask());
|
g_traceBatch.addEvent("WatchValueDebug", req.debugID.get().first(), "watchValueQ.AfterRead"); //.detail("TaskID", g_network->getCurrentTask());
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<WatchValueDebugTrace>(new WatchValueDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
WatchValueDebugTrace::STORAGESERVER_WATCHVALUE_AFTERREAD)));
|
|
||||||
}
|
|
||||||
|
|
||||||
if( reply.value != req.value ) {
|
if( reply.value != req.value ) {
|
||||||
req.reply.send(WatchValueReply{ latest });
|
req.reply.send(WatchValueReply{ latest });
|
||||||
|
@ -1376,24 +1351,16 @@ ACTOR Future<Void> getKeyValues( StorageServer* data, GetKeyValuesRequest req )
|
||||||
wait( delay(0, taskType) );
|
wait( delay(0, taskType) );
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if( req.debugID.present() ) {
|
if( req.debugID.present() )
|
||||||
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.Before");
|
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.Before");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::STORAGESERVER_GETKEYVALUES_BEFORE)));
|
|
||||||
}
|
|
||||||
state Version version = wait( waitForVersion( data, req.version ) );
|
state Version version = wait( waitForVersion( data, req.version ) );
|
||||||
|
|
||||||
state uint64_t changeCounter = data->shardChangeCounter;
|
state uint64_t changeCounter = data->shardChangeCounter;
|
||||||
// try {
|
// try {
|
||||||
state KeyRange shard = getShardKeyRange( data, req.begin );
|
state KeyRange shard = getShardKeyRange( data, req.begin );
|
||||||
|
|
||||||
if( req.debugID.present() ) {
|
if( req.debugID.present() )
|
||||||
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.AfterVersion");
|
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.AfterVersion");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::STORAGESERVER_GETKEYVALUES_AFTERVERSION)));
|
|
||||||
}
|
|
||||||
//.detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end);
|
//.detail("ShardBegin", shard.begin).detail("ShardEnd", shard.end);
|
||||||
//} catch (Error& e) { TraceEvent("WrongShardServer", data->thisServerID).detail("Begin", req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard", "None").detail("In", "getKeyValues>getShardKeyRange"); throw e; }
|
//} catch (Error& e) { TraceEvent("WrongShardServer", data->thisServerID).detail("Begin", req.begin.toString()).detail("End", req.end.toString()).detail("Version", version).detail("Shard", "None").detail("In", "getKeyValues>getShardKeyRange"); throw e; }
|
||||||
|
|
||||||
|
@ -1408,12 +1375,8 @@ ACTOR Future<Void> getKeyValues( StorageServer* data, GetKeyValuesRequest req )
|
||||||
state Future<Key> fEnd = req.end.isFirstGreaterOrEqual() ? Future<Key>(req.end.getKey()) : findKey( data, req.end, version, shard, &offset2 );
|
state Future<Key> fEnd = req.end.isFirstGreaterOrEqual() ? Future<Key>(req.end.getKey()) : findKey( data, req.end, version, shard, &offset2 );
|
||||||
state Key begin = wait(fBegin);
|
state Key begin = wait(fBegin);
|
||||||
state Key end = wait(fEnd);
|
state Key end = wait(fEnd);
|
||||||
if( req.debugID.present() ) {
|
if( req.debugID.present() )
|
||||||
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.AfterKeys");
|
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.AfterKeys");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::STORAGESERVER_GETKEYVALUES_AFTERKEYS)));
|
|
||||||
}
|
|
||||||
//.detail("Off1",offset1).detail("Off2",offset2).detail("ReqBegin",req.begin.getKey()).detail("ReqEnd",req.end.getKey());
|
//.detail("Off1",offset1).detail("Off2",offset2).detail("ReqBegin",req.begin.getKey()).detail("ReqEnd",req.end.getKey());
|
||||||
|
|
||||||
// Offsets of zero indicate begin/end keys in this shard, which obviously means we can answer the query
|
// Offsets of zero indicate begin/end keys in this shard, which obviously means we can answer the query
|
||||||
|
@ -1429,12 +1392,8 @@ ACTOR Future<Void> getKeyValues( StorageServer* data, GetKeyValuesRequest req )
|
||||||
}
|
}
|
||||||
|
|
||||||
if (begin >= end) {
|
if (begin >= end) {
|
||||||
if( req.debugID.present() ) {
|
if( req.debugID.present() )
|
||||||
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.Send");
|
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.Send");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::STORAGESERVER_GETKEYVALUES_SEND)));
|
|
||||||
}
|
|
||||||
//.detail("Begin",begin).detail("End",end);
|
//.detail("Begin",begin).detail("End",end);
|
||||||
|
|
||||||
GetKeyValuesReply none;
|
GetKeyValuesReply none;
|
||||||
|
@ -1450,12 +1409,8 @@ ACTOR Future<Void> getKeyValues( StorageServer* data, GetKeyValuesRequest req )
|
||||||
GetKeyValuesReply _r = wait( readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes) );
|
GetKeyValuesReply _r = wait( readRange(data, version, KeyRangeRef(begin, end), req.limit, &remainingLimitBytes) );
|
||||||
GetKeyValuesReply r = _r;
|
GetKeyValuesReply r = _r;
|
||||||
|
|
||||||
if( req.debugID.present() ) {
|
if( req.debugID.present() )
|
||||||
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.AfterReadRange");
|
g_traceBatch.addEvent("TransactionDebug", req.debugID.get().first(), "storageserver.getKeyValues.AfterReadRange");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
TransactionDebugTrace::STORAGESERVER_GETKEYVALUES_AFTERREADRANGE)));
|
|
||||||
}
|
|
||||||
//.detail("Begin",begin).detail("End",end).detail("SizeOf",r.data.size());
|
//.detail("Begin",begin).detail("End",end).detail("SizeOf",r.data.size());
|
||||||
data->checkChangeCounter( changeCounter, KeyRangeRef( std::min<KeyRef>(begin, std::min<KeyRef>(req.begin.getKey(), req.end.getKey())), std::max<KeyRef>(end, std::max<KeyRef>(req.begin.getKey(), req.end.getKey())) ) );
|
data->checkChangeCounter( changeCounter, KeyRangeRef( std::min<KeyRef>(begin, std::min<KeyRef>(req.begin.getKey(), req.end.getKey())), std::max<KeyRef>(end, std::max<KeyRef>(req.begin.getKey(), req.end.getKey())) ) );
|
||||||
if (EXPENSIVE_VALIDATION) {
|
if (EXPENSIVE_VALIDATION) {
|
||||||
|
@ -3647,12 +3602,8 @@ ACTOR Future<Void> storageServerCore( StorageServer* self, StorageServerInterfac
|
||||||
}
|
}
|
||||||
when( GetValueRequest req = waitNext(ssi.getValue.getFuture()) ) {
|
when( GetValueRequest req = waitNext(ssi.getValue.getFuture()) ) {
|
||||||
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade before doing real work
|
// Warning: This code is executed at extremely high priority (TaskPriority::LoadBalancedEndpoint), so downgrade before doing real work
|
||||||
if( req.debugID.present() ) {
|
if( req.debugID.present() )
|
||||||
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "storageServer.received"); //.detail("TaskID", g_network->getCurrentTask());
|
g_traceBatch.addEvent("GetValueDebug", req.debugID.get().first(), "storageServer.received"); //.detail("TaskID", g_network->getCurrentTask());
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<GetValueDebugTrace>(new GetValueDebugTrace(req.debugID.get().first(), now(),
|
|
||||||
GetValueDebugTrace::STORAGESERVER_GETVALUE_RECEIVED)));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (SHORT_CIRCUT_ACTUAL_STORAGE && normalKeys.contains(req.key))
|
if (SHORT_CIRCUT_ACTUAL_STORAGE && normalKeys.contains(req.key))
|
||||||
req.reply.send(GetValueReply());
|
req.reply.send(GetValueReply());
|
||||||
|
|
|
@ -31,7 +31,6 @@
|
||||||
#include "fdbserver/ClusterRecruitmentInterface.h"
|
#include "fdbserver/ClusterRecruitmentInterface.h"
|
||||||
#include "fdbclient/ReadYourWrites.h"
|
#include "fdbclient/ReadYourWrites.h"
|
||||||
#include "flow/TDMetric.actor.h"
|
#include "flow/TDMetric.actor.h"
|
||||||
#include "flow/FBTrace.h"
|
|
||||||
#include "flow/actorcompiler.h" // This must be the last #include.
|
#include "flow/actorcompiler.h" // This must be the last #include.
|
||||||
|
|
||||||
const int sampleSize = 10000;
|
const int sampleSize = 10000;
|
||||||
|
@ -619,9 +618,6 @@ struct ReadWriteWorkload : KVWorkload {
|
||||||
debugID = deterministicRandom()->randomUniqueID();
|
debugID = deterministicRandom()->randomUniqueID();
|
||||||
tr.debugTransaction(debugID);
|
tr.debugTransaction(debugID);
|
||||||
g_traceBatch.addEvent("TransactionDebug", debugID.first(), "ReadWrite.randomReadWriteClient.Before");
|
g_traceBatch.addEvent("TransactionDebug", debugID.first(), "ReadWrite.randomReadWriteClient.Before");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(debugID.first(), now(),
|
|
||||||
TransactionDebugTrace::READWRITE_RANDOMREADWRITECLIENT_BEFORE)));
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
debugID = UID();
|
debugID = UID();
|
||||||
|
@ -694,12 +690,8 @@ struct ReadWriteWorkload : KVWorkload {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(debugID != UID()) {
|
if(debugID != UID())
|
||||||
g_traceBatch.addEvent("TransactionDebug", debugID.first(), "ReadWrite.randomReadWriteClient.After");
|
g_traceBatch.addEvent("TransactionDebug", debugID.first(), "ReadWrite.randomReadWriteClient.After");
|
||||||
//FIXME
|
|
||||||
fbTrace(Reference<TransactionDebugTrace>(new TransactionDebugTrace(debugID.first(), now(),
|
|
||||||
TransactionDebugTrace::READWRITE_RANDOMREADWRITECLIENT_AFTER)));
|
|
||||||
}
|
|
||||||
|
|
||||||
tr = Trans();
|
tr = Trans();
|
||||||
|
|
||||||
|
|
|
@ -44,20 +44,17 @@ public:
|
||||||
constexpr static FileIdentifier file_identifier = 617894;
|
constexpr static FileIdentifier file_identifier = 617894;
|
||||||
enum codeLocation {
|
enum codeLocation {
|
||||||
STORAGESERVER_GETVALUE_RECEIVED = 0,
|
STORAGESERVER_GETVALUE_RECEIVED = 0,
|
||||||
STORAGESERVER_GETVALUE_DOREAD = 1,
|
STORAGESERVER_GETVALUE_DO_READ = 1,
|
||||||
STORAGESERVER_GETVALUE_AFTERVERSION = 2,
|
STORAGESERVER_GETVALUE_AFTER_VERSION = 2,
|
||||||
STORAGESERVER_GETVALUE_AFTERREAD = 3,
|
STORAGESERVER_GETVALUE_AFTER_READ = 3,
|
||||||
STORAGECACHE_GETVALUE_RECEIVED = 4,
|
STORAGECACHE_GETVALUE_RECEIVED = 4,
|
||||||
STORAGECACHE_GETVALUE_DOREAD = 5,
|
STORAGECACHE_GETVALUE_DO_READ = 5,
|
||||||
STORAGECACHE_GETVALUE_AFTERVERSION = 6,
|
STORAGECACHE_GETVALUE_AFTER_VERSION = 6,
|
||||||
STORAGECACHE_GETVALUE_AFTERREAD = 7,
|
STORAGECACHE_GETVALUE_AFTER_READ = 7,
|
||||||
READER_GETVALUE_BEFORE = 8,
|
READER_GETVALUE_BEFORE = 8,
|
||||||
READER_GETVALUE_AFTER = 9,
|
READER_GETVALUE_AFTER = 9,
|
||||||
READER_GETVALUEPREFIX_BEFORE = 10,
|
READER_GETVALUEPREFIX_BEFORE = 10,
|
||||||
READER_GETVALUEPREFIX_AFTER = 11,
|
READER_GETVALUEPREFIX_AFTER = 11
|
||||||
NATIVEAPI_GETVALUE_BEFORE = 12,
|
|
||||||
NATIVEAPI_GETVALUE_AFTER = 13,
|
|
||||||
NATIVEAPI_GETVALUE_ERROR = 14
|
|
||||||
};
|
};
|
||||||
|
|
||||||
uint64_t id;
|
uint64_t id;
|
||||||
|
@ -77,10 +74,10 @@ public:
|
||||||
constexpr static FileIdentifier file_identifier = 14486715;
|
constexpr static FileIdentifier file_identifier = 14486715;
|
||||||
enum codeLocation {
|
enum codeLocation {
|
||||||
STORAGESERVER_WATCHVALUE_BEFORE = 1,
|
STORAGESERVER_WATCHVALUE_BEFORE = 1,
|
||||||
STORAGESERVER_WATCHVALUE_AFTERVERSION = 2,
|
STORAGESERVER_WATCHVALUE_AFTER_VERSION = 2,
|
||||||
STORAGESERVER_WATCHVALUE_AFTERREAD = 3,
|
STORAGESERVER_WATCHVALUE_AFTER_READ = 3,
|
||||||
NATIVEAPI_WATCHVALUE_BEFORE = 4,
|
NATIVEAPI_WATCHVALUE_BEFORE = 4,
|
||||||
NATIVEAPI_WATCHVALUE_AFTER = 5
|
NATIVEAPI_WATCHVALUE_AFTER_READ = 5
|
||||||
};
|
};
|
||||||
|
|
||||||
uint64_t id;
|
uint64_t id;
|
||||||
|
@ -102,9 +99,9 @@ public:
|
||||||
STORAGESERVER_COMMIT_BEORE = 0,
|
STORAGESERVER_COMMIT_BEORE = 0,
|
||||||
STORAGESERVER_COMMIT_AFTER_VERSION = 1,
|
STORAGESERVER_COMMIT_AFTER_VERSION = 1,
|
||||||
STORAGESERVER_COMMIT_AFTER_READ = 2,
|
STORAGESERVER_COMMIT_AFTER_READ = 2,
|
||||||
NATIVEAPI_COMMIT_BEFORE = 3,
|
NATIVEAPI_COMMIT_BEORE = 3,
|
||||||
NATIVEAPI_COMMIT_AFTER = 4,
|
NATIVEAPI_COMMIT_AFTER = 4,
|
||||||
MASTERPROXYSERVER_BATCHER = 5,
|
MASTERROXYSERVER_BATCHER = 5,
|
||||||
MASTERPROXYSERVER_COMMITBATCH_BEFORE = 6,
|
MASTERPROXYSERVER_COMMITBATCH_BEFORE = 6,
|
||||||
MASTERPROXYSERVER_COMMITBATCH_GETTINGCOMMITVERSION = 7,
|
MASTERPROXYSERVER_COMMITBATCH_GETTINGCOMMITVERSION = 7,
|
||||||
MASTERPROXYSERVER_COMMITBATCH_GOTCOMMITVERSION = 8,
|
MASTERPROXYSERVER_COMMITBATCH_GOTCOMMITVERSION = 8,
|
||||||
|
|
Loading…
Reference in New Issue